From 7c451a5f288fdc8b51adaea0ab3de7c2fb725b23 Mon Sep 17 00:00:00 2001 From: Stefan Gula Date: Fri, 1 Mar 2024 13:42:03 +0100 Subject: [PATCH] schema: adds PNode and its exact variants This patch introduces PNode alternative to SNode, which allows to travers parsed module tree including groupings, uses and other special nodes Signed-off-by: Stefan Gula --- cffi/cdefs.h | 96 ++++ libyang/__init__.py | 35 ++ libyang/schema.py | 760 +++++++++++++++++++++++++++- libyang/util.py | 8 + tests/test_schema.py | 251 +++++++++ tests/yang/yolo/yolo-nodetypes.yang | 31 ++ 6 files changed, 1168 insertions(+), 13 deletions(-) diff --git a/cffi/cdefs.h b/cffi/cdefs.h index 7bf1733a..b3d8b587 100644 --- a/cffi/cdefs.h +++ b/cffi/cdefs.h @@ -549,6 +549,25 @@ typedef enum { char* lysc_path(const struct lysc_node *, LYSC_PATH_TYPE, char *, size_t); +struct lysp_when { + const char *cond; + ...; +}; + +struct lysp_refine { + const char *nodeid; + const char *dsc; + const char *ref; + struct lysp_qname *iffeatures; + struct lysp_restr *musts; + const char *presence; + struct lysp_qname *dflts; + uint32_t min; + uint32_t max; + struct lysp_ext_instance *exts; + uint16_t flags; +}; + struct lysp_node_container { struct lysp_restr *musts; struct lysp_when *when; @@ -636,6 +655,73 @@ struct lysp_node_list { ...; }; +struct lysp_node_choice { + struct lysp_node *child; + struct lysp_when *when; + struct lysp_qname dflt; + ...; +}; + +struct lysp_node_case { + struct lysp_node *child; + struct lysp_when *when; + ...; +}; + +struct lysp_node_anydata { + struct lysp_restr *musts; + struct lysp_when *when; + ...; +}; + +struct lysp_node_uses { + struct lysp_refine *refines; + struct lysp_node_augment *augments; + struct lysp_when *when; + ...; +}; + +struct lysp_node_action_inout { + struct lysp_restr *musts; + struct lysp_tpdf *typedefs; + struct lysp_node_grp *groupings; + struct lysp_node *child; + ...; +}; + +struct lysp_node_action { + struct lysp_tpdf *typedefs; + struct lysp_node_grp *groupings; + struct lysp_node_action_inout input; + struct lysp_node_action_inout output; + ...; +}; + +struct lysp_node_notif { + struct lysp_restr *musts; + struct lysp_tpdf *typedefs; + struct lysp_node_grp *groupings; + struct lysp_node *child; + ...; +}; + +struct lysp_node_grp { + struct lysp_tpdf *typedefs; + struct lysp_node_grp *groupings; + struct lysp_node *child; + struct lysp_node_action *actions; + struct lysp_node_notif *notifs; + ...; +}; + +struct lysp_node_augment { + struct lysp_node *child; + struct lysp_when *when; + struct lysp_node_action *actions; + struct lysp_node_notif *notifs; + ...; +}; + struct lysc_type { const char *name; struct lysc_ext_instance *exts; @@ -644,6 +730,16 @@ struct lysc_type { uint32_t refcount; }; +struct lysp_type_enum { + const char *name; + const char *dsc; + const char *ref; + int64_t value; + struct lysp_qname *iffeatures; + struct lysp_ext_instance *exts; + uint16_t flags; +}; + struct lysp_type { const char *name; struct lysp_restr *range; diff --git a/libyang/__init__.py b/libyang/__init__.py index 92e06f22..762225de 100644 --- a/libyang/__init__.py +++ b/libyang/__init__.py @@ -79,8 +79,26 @@ IfOrFeatures, Module, Must, + PAction, + PActionInOut, + PAnydata, Pattern, + PAugment, + PCase, + PChoice, + PContainer, + PEnum, + PGrouping, + PLeaf, + PLeafList, + PList, + PNode, + PNotif, + PRefine, + PType, + PUses, Revision, + SAnydata, SCase, SChoice, SContainer, @@ -156,6 +174,23 @@ "NodeTypeRemoved", "OrderedByUserAdded", "OrderedByUserRemoved", + "PAction", + "PActionInOut", + "PAnydata", + "PAugment", + "PCase", + "PChoice", + "PContainer", + "PEnum", + "PGrouping", + "PLeaf", + "PLeafList", + "PList", + "PNode", + "PNotif", + "PRefine", + "PType", + "PUses", "Pattern", "PatternAdded", "PatternRemoved", diff --git a/libyang/schema.py b/libyang/schema.py index a4a59fa6..a77d1d50 100644 --- a/libyang/schema.py +++ b/libyang/schema.py @@ -6,7 +6,15 @@ from typing import IO, Any, Dict, Iterator, List, Optional, Tuple, Union from _libyang import ffi, lib -from .util import IOType, LibyangError, c2str, init_output, ly_array_iter, str2c +from .util import ( + IOType, + LibyangError, + c2str, + init_output, + ly_array_iter, + ly_list_iter, + str2c, +) # ------------------------------------------------------------------------------------- @@ -42,6 +50,18 @@ def printer_flags( return flags +# ------------------------------------------------------------------------------------- +def _print_qname(qname, module: "Module") -> str: + module_str = ( + f"{c2str(qname.mod.mod.name)}:" + if qname.mod not in (ffi.NULL, module.cdata.parsed) + else None + ) + ret = module_str if module_str is not None else "" + ret += c2str(qname.str) + return ret + + # ------------------------------------------------------------------------------------- class Module: __slots__ = ("context", "cdata", "__dict__") @@ -144,6 +164,26 @@ def children( self.context, self.cdata, types=types, with_choice=with_choice ) + def parsed_children(self) -> Iterator["PNode"]: + for c in ly_list_iter(self.cdata.parsed.data): + yield PNode.new(self.context, c, self) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata.parsed.groupings): + yield PGrouping(self.context, g, self) + + def augments(self) -> Iterator["PAugment"]: + for a in ly_array_iter(self.cdata.parsed.augments): + yield PAugment(self.context, a, self) + + def actions(self) -> Iterator["PAction"]: + for a in ly_list_iter(self.cdata.parsed.rpcs): + yield PAction(self.context, a, self) + + def notifications(self) -> Iterator["PNotif"]: + for n in ly_list_iter(self.cdata.parsed.notifs): + yield PNotif(self.context, n, self) + def __str__(self) -> str: return self.name() @@ -300,7 +340,7 @@ class Import: def __init__(self, context: "libyang.Context", cdata, module): self.context = context - self.cdata = cdata # C type: "struct lysp_revision *" + self.cdata = cdata # C type: "struct lysp_import *" self.module = module def name(self) -> str: @@ -470,20 +510,27 @@ class Bit(_EnumBit): # ------------------------------------------------------------------------------------- class Pattern: - __slots__ = ("context", "cdata") + __slots__ = ("context", "cdata", "cdata_parsed") - def __init__(self, context: "libyang.Context", cdata): + def __init__(self, context: "libyang.Context", cdata, cdata_parsed=None): self.context = context self.cdata = cdata # C type: "struct lysc_pattern *" + self.cdata_parsed = cdata_parsed # C type: "struct lysp_restr *" def expression(self) -> str: + if self.cdata is None: + return c2str(self.cdata_parsed.arg.str + 1) return c2str(self.cdata.expr) def inverted(self) -> bool: + if self.cdata is None: + return self.cdata.arg.str[0] == b"\x15" return self.cdata.inverted def error_message(self) -> Optional[str]: - return c2str(self.cdata.emsg) if self.cdata.emsg != ffi.NULL else None + if self.cdata is None: + return c2str(self.cdata_parsed.emsg) + return c2str(self.cdata.emsg) # ------------------------------------------------------------------------------------- @@ -772,6 +819,11 @@ def __repr__(self): def __str__(self): return self.name() + def parsed(self) -> Optional["PType"]: + if self.cdata_parsed is None or self.cdata_parsed == ffi.NULL: + return None + return PType(self.context, self.cdata_parsed, self.module()) + # ------------------------------------------------------------------------------------- class Typedef: @@ -1094,17 +1146,22 @@ def __str__(self): # ------------------------------------------------------------------------------------- class Must: - __slots__ = ("context", "cdata") + __slots__ = ("context", "cdata", "cdata_parsed") - def __init__(self, context: "libyang.Context", cdata): + def __init__(self, context: "libyang.Context", cdata, cdata_parsed=None): self.context = context self.cdata = cdata # C type: "struct lysc_must *" + self.cdata_parsed = cdata_parsed # C type: "struct lysp_must *" def condition(self) -> str: - return c2str(lib.lyxp_get_expr(self.cdata.cond)) + if self.cdata is not None: + return c2str(lib.lyxp_get_expr(self.cdata.cond)) + return c2str(self.cdata_parsed.arg.str + 1) def error_message(self) -> Optional[str]: - return c2str(self.cdata.emsg) if self.cdata.emsg != ffi.NULL else None + if self.cdata is not None: + return c2str(self.cdata.emsg) + return c2str(self.cdata_parsed.emsg) # ------------------------------------------------------------------------------------- @@ -1256,6 +1313,11 @@ def when_conditions(self): for cond in ly_array_iter(wh): yield c2str(lib.lyxp_get_expr(cond.cond)) + def parsed(self) -> Optional["PNode"]: + if self.cdata_parsed is None or self.cdata_parsed == ffi.NULL: + return None + return PNode.new(self.context, self.cdata_parsed, self.module()) + def iter_tree(self, full: bool = False) -> Iterator["SNode"]: """ Do a DFS walk of the schema node. @@ -1398,7 +1460,7 @@ def defaults(self) -> Iterator[Union[None, bool, int, str, float]]: else: yield val - def max_elements(self) -> int: + def max_elements(self) -> Optional[int]: return ( self.cdata_leaflist.max if self.cdata_leaflist.max != (2**32 - 1) @@ -1522,12 +1584,12 @@ def uniques(self) -> Iterator[List[SNode]]: nodes.append(SNode.new(self.context, node)) yield nodes - def max_elements(self) -> int: - return self.cdata_list.max if self.cdata_list.max != (2**32 - 1) else None - def min_elements(self) -> int: return self.cdata_list.min + def max_elements(self) -> Optional[int]: + return self.cdata_list.max if self.cdata_list.max != (2**32 - 1) else None + def __str__(self): return "%s [%s]" % (self.name(), ", ".join(k.name() for k in self.keys())) @@ -1693,3 +1755,675 @@ def _skip(node) -> bool: Rpc = SRpc RpcInOut = SRpcInOut Anyxml = SAnyxml + + +# ------------------------------------------------------------------------------------- +class PEnum: + __slots__ = ("context", "cdata", "module") + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + self.context = context + self.cdata = cdata # C type of "struct lysp_type_enum *" + self.module = module + + def name(self) -> str: + return c2str(self.cdata.name) + + def description(self) -> Optional[str]: + return c2str(self.cdata.dsc) + + def reference(self) -> Optional[str]: + return c2str(self.cdata.ref) + + def value(self) -> int: + return self.cdata.value + + def if_features(self) -> Iterator[IfFeatureExpr]: + for f in ly_array_iter(self.cdata.iffeatures): + yield IfFeatureExpr(self.context, f, list(self.module.features())) + + def extensions(self) -> Iterator["ExtensionParsed"]: + for ext in ly_array_iter(self.cdata.exts): + yield ExtensionParsed(self.context, ext, self.module) + + +# ------------------------------------------------------------------------------------- +class PType: + __slots__ = ("context", "cdata", "module") + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + self.context = context + self.cdata = cdata # C type of "struct lysp_type *" + self.module = module + + def name(self) -> str: + return c2str(self.cdata.name) + + def range(self) -> Optional[str]: + if self.cdata.range == ffi.NULL: + return None + return _print_qname(self.cdata.range.arg, self.module) + + def length(self) -> Optional[str]: + if self.cdata.length == ffi.NULL: + return None + return _print_qname(self.cdata.length.arg, self.module) + + def patterns(self) -> Iterator[Pattern]: + for p in ly_array_iter(self.cdata.patterns): + yield Pattern(self.context, None, p) + + def enums(self) -> Iterator[PEnum]: + for e in ly_array_iter(self.cdata.enums): + yield PEnum(self.context, e, self.module) + + def bits(self) -> Iterator[PEnum]: + for b in ly_array_iter(self.cdata.bits): + yield PEnum(self.context, b, self.module) + + def path(self) -> Optional[str]: + if self.cdata.path == ffi.NULL: + return None + return c2str(lib.lyxp_get_expr(self.cdata.path)) + + def bases(self) -> Iterator[str]: + for b in ly_array_iter(self.cdata.bases): + yield c2str(b) + + def types(self) -> Iterator["PType"]: + for t in ly_array_iter(self.cdata.types): + yield PType(self.context, t, self.module) + + def extensions(self) -> Iterator["ExtensionParsed"]: + for ext in ly_array_iter(self.cdata.exts): + yield ExtensionParsed(self.context, ext, self.module) + + def pmod(self) -> Optional[Module]: + if self.cdata.pmod == ffi.NULL: + return None + return Module(self.context, self.cdata.pmod.mod) + + def compiled(self) -> Optional[Type]: + if self.cdata.compiled == ffi.NULL: + return None + return Type(self.context, self.cdata.compiled, self.cdata) + + def fraction_digits(self) -> int: + return self.cdata.fraction_digits + + def require_instance(self) -> bool: + return self.cdata.require_instance + + +# ------------------------------------------------------------------------------------- +class PRefine: + __slots__ = ("context", "cdata", "module") + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + self.context = context + self.cdata = cdata # C type of "struct lysp_refine *" + self.module = module + + def nodeid(self) -> str: + return c2str(self.cdata.nodeid) + + def description(self) -> Optional[str]: + return c2str(self.cdata.dsc) + + def reference(self) -> Optional[str]: + return c2str(self.cdata.ref) + + def if_features(self) -> Iterator[IfFeatureExpr]: + for f in ly_array_iter(self.cdata.iffeatures): + yield IfFeatureExpr(self.context, f, list(self.module.features())) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata.musts): + yield Must(self.context, None, m) + + def presence(self) -> Optional[str]: + return c2str(self.cdata.presence) + + def defaults(self) -> Iterator[str]: + for d in ly_array_iter(self.cdata.dflts): + yield _print_qname(d, self.module) + + def min_elements(self) -> int: + return self.cdata.min + + def max_elements(self) -> Optional[int]: + return self.cdata.max if self.cdata.max != 0 else None + + def extensions(self) -> Iterator["ExtensionParsed"]: + for ext in ly_array_iter(self.cdata.exts): + yield ExtensionParsed(self.context, ext, self.module) + + +# ------------------------------------------------------------------------------------- +class PNode: + CONTAINER = lib.LYS_CONTAINER + CHOICE = lib.LYS_CHOICE + CASE = lib.LYS_CASE + LEAF = lib.LYS_LEAF + LEAFLIST = lib.LYS_LEAFLIST + LIST = lib.LYS_LIST + RPC = lib.LYS_RPC + ACTION = lib.LYS_ACTION + INPUT = lib.LYS_INPUT + OUTPUT = lib.LYS_OUTPUT + NOTIF = lib.LYS_NOTIF + ANYXML = lib.LYS_ANYXML + ANYDATA = lib.LYS_ANYDATA + AUGMENT = lib.LYS_AUGMENT + USES = lib.LYS_USES + GROUPING = lib.LYS_GROUPING + KEYWORDS = { + CONTAINER: "container", + LEAF: "leaf", + LEAFLIST: "leaf-list", + LIST: "list", + RPC: "rpc", + ACTION: "action", + INPUT: "input", + OUTPUT: "output", + NOTIF: "notification", + ANYXML: "anyxml", + ANYDATA: "anydata", + AUGMENT: "augment", + USES: "uses", + GROUPING: "grouping", + } + + __slots__ = ("context", "cdata", "module", "__dict__") + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + self.context = context + self.cdata = cdata # C type of "struct lysp_node *" + self.module = module + + def parent(self) -> Optional["PNode"]: + if self.cdata.parent == ffi.NULL: + return None + return PNode.new(self.context, self.cdata.parent, self.module) + + def nodetype(self) -> int: + return self.cdata.nodetype + + def siblings(self) -> Iterator["PNode"]: + for s in ly_list_iter(self.cdata.next): + yield PNode.new(self.context, s, self.module) + + def name(self) -> str: + return c2str(self.cdata.name) + + def description(self) -> Optional[str]: + return c2str(self.cdata.dsc) + + def reference(self) -> Optional[str]: + return c2str(self.cdata.ref) + + def if_features(self) -> Iterator[IfFeatureExpr]: + for f in ly_array_iter(self.cdata.iffeatures): + yield IfFeatureExpr(self.context, f, list(self.module.features())) + + def extensions(self) -> Iterator["ExtensionParsed"]: + for ext in ly_array_iter(self.cdata.exts): + yield ExtensionParsed(self.context, ext, self.module) + + def config_set(self) -> bool: + return bool(self.cdata.flags & lib.LYS_SET_CONFIG) + + def config_false(self) -> bool: + return bool(self.cdata.flags & lib.LYS_CONFIG_R) + + def mandatory(self) -> bool: + return bool(self.cdata.flags & lib.LYS_MAND_TRUE) + + def deprecated(self) -> bool: + return bool(self.cdata.flags & lib.LYS_STATUS_DEPRC) + + def obsolete(self) -> bool: + return bool(self.cdata.flags & lib.LYS_STATUS_OBSLT) + + def status(self) -> str: + if self.cdata.flags & lib.LYS_STATUS_OBSLT: + return "obsolete" + if self.cdata.flags & lib.LYS_STATUS_DEPRC: + return "deprecated" + return "current" + + def __repr__(self): + cls = self.__class__ + return "<%s.%s: %s>" % (cls.__module__, cls.__name__, str(self)) + + def __str__(self): + return self.name() + + NODETYPE_CLASS = {} + + @staticmethod + def register(nodetype): + def _decorator(nodeclass): + PNode.NODETYPE_CLASS[nodetype] = nodeclass + return nodeclass + + return _decorator + + @staticmethod + def new(context: "libyang.Context", cdata, module: Module) -> "PNode": + cdata = ffi.cast("struct lysp_node *", cdata) + nodecls = PNode.NODETYPE_CLASS.get(cdata.nodetype, None) + if nodecls is None: + raise TypeError("node type %s not implemented" % cdata.nodetype) + return nodecls(context, cdata, module) + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.CONTAINER) +class PContainer(PNode): + __slots__ = ("cdata_container",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_container = ffi.cast("struct lysp_node_container *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_container.musts): + yield Must(self.context, None, m) + + def when_condition(self) -> Optional[str]: + if self.cdata_container.when == ffi.NULL: + return None + return c2str(self.cdata_container.when.cond) + + def presence(self) -> Optional[str]: + return c2str(self.cdata_container.presence) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_container.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_container.groupings): + yield PGrouping(self.context, g, self.module) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_container.child): + yield PNode.new(self.context, c, self.module) + + def actions(self) -> Iterator["PAction"]: + for a in ly_list_iter(self.cdata_container.actions): + yield PAction(self.context, a, self.module) + + def notifications(self) -> Iterator["PNotif"]: + for n in ly_list_iter(self.cdata_container.notifs): + yield PNotif(self.context, n, self.module) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.LEAF) +class PLeaf(PNode): + __slots__ = ("cdata_leaf",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_leaf = ffi.cast("struct lysp_node_leaf *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_leaf.musts): + yield Must(self.context, None, m) + + def when_condition(self) -> Optional[str]: + if self.cdata_leaf.when == ffi.NULL: + return None + return c2str(self.cdata_leaf.when.cond) + + def type(self) -> PType: + return PType(self.context, self.cdata_leaf.type, self.module) + + def units(self) -> Optional[str]: + return c2str(self.cdata_leaf.units) + + def default(self) -> Optional[str]: + return _print_qname(self.cdata_leaf.dflt, self.module) + + def is_key(self) -> bool: + if self.cdata.flags & lib.LYS_KEY: + return True + return False + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.LEAFLIST) +class PLeafList(PNode): + __slots__ = ("cdata_leaflist",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_leaflist = ffi.cast("struct lysp_node_leaflist *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_leaflist.musts): + yield Must(self.context, None, m) + + def when_condition(self) -> Optional[str]: + if self.cdata_leaflist.when == ffi.NULL: + return None + return c2str(self.cdata_leaflist.when.cond) + + def type(self) -> PType: + return PType(self.context, self.cdata_leaflist.type, self.module) + + def units(self) -> Optional[str]: + return c2str(self.cdata_leaflist.units) + + def defaults(self) -> Iterator[str]: + for d in ly_array_iter(self.cdata_leaflist.dflts): + yield _print_qname(d, self.module) + + def min_elements(self) -> int: + return self.cdata_leaflist.min + + def max_elements(self) -> Optional[int]: + return self.cdata_leaflist.max if self.cdata_leaflist.max != 0 else None + + def ordered(self) -> bool: + return bool(self.cdata.flags & lib.LYS_ORDBY_USER) + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.LIST) +class PList(PNode): + __slots__ = ("cdata_list",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_list = ffi.cast("struct lysp_node_list *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_list.musts): + yield Must(self.context, None, m) + + def when_condition(self) -> Optional[str]: + if self.cdata_list.when == ffi.NULL: + return None + return c2str(self.cdata_list.when.cond) + + def key(self) -> Optional[str]: + return c2str(self.cdata_list.key) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_list.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_list.groupings): + yield PGrouping(self.context, g, self.module) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_list.child): + yield PNode.new(self.context, c, self.module) + + def actions(self) -> Iterator["PAction"]: + for a in ly_list_iter(self.cdata_list.actions): + yield PAction(self.context, a, self.module) + + def notifications(self) -> Iterator["PNotif"]: + for n in ly_list_iter(self.cdata_list.notifs): + yield PNotif(self.context, n, self.module) + + def uniques(self) -> Iterator[str]: + for u in ly_array_iter(self.cdata_list.uniques): + yield _print_qname(u, self.module) + + def min_elements(self) -> int: + return self.cdata_list.min + + def max_elements(self) -> Optional[int]: + return self.cdata_list.max if self.cdata_list.max != 0 else None + + def ordered(self) -> bool: + return bool(self.cdata.flags & lib.LYS_ORDBY_USER) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.CASE) +class PCase(PNode): + __slots__ = ("cdata_case",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_case = ffi.cast("struct lysp_node_case *", cdata) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_case.child): + yield PNode.new(self.context, c, self.module) + + def when_condition(self) -> Optional[str]: + if self.cdata_case.when == ffi.NULL: + return None + return c2str(self.cdata_case.when.cond) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.CHOICE) +class PChoice(PNode): + __slots__ = ("cdata_choice",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_choice = ffi.cast("struct lysp_node_choice *", cdata) + + def children(self) -> Iterator[PCase]: + for c in ly_list_iter(self.cdata_choice.child): + yield PCase(self.context, c, self.module) + + def when_condition(self) -> Optional[str]: + if self.cdata_choice.when == ffi.NULL: + return None + return c2str(self.cdata_choice.when.cond) + + def default(self) -> Optional[str]: + return _print_qname(self.cdata_choice.dflt, self.module) + + def __iter__(self) -> Iterator[PCase]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.ANYXML) +@PNode.register(PNode.ANYDATA) +class PAnydata(PNode): + __slots__ = ("cdata_anydata",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_anydata = ffi.cast("struct lysp_node_anydata *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_anydata.musts): + yield Must(self.context, None, m) + + def when_condition(self) -> Optional[str]: + if self.cdata_anydata.when == ffi.NULL: + return None + return c2str(self.cdata_anydata.when.cond) + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.AUGMENT) +class PAugment(PNode): + __slots__ = ("cdata_augment",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_augment = ffi.cast("struct lysp_node_augment *", cdata) + + def children(self) -> Iterator["PNode"]: + for c in ly_list_iter(self.cdata_augment.child): + yield PNode.new(self.context, c, self.module) + + def when_condition(self) -> Optional[str]: + if self.cdata_augment.when == ffi.NULL: + return None + return c2str(self.cdata_augment.when.cond) + + def actions(self) -> Iterator["PAction"]: + for a in ly_list_iter(self.cdata_augment.actions): + yield PAction(self.context, a, self.module) + + def notifications(self) -> Iterator["PNotif"]: + for n in ly_list_iter(self.cdata_augment.notifs): + yield PNotif(self.context, n, self.module) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.USES) +class PUses(PNode): + __slots__ = ("cdata_uses",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_uses = ffi.cast("struct lysp_node_uses *", cdata) + + def refines(self) -> Iterator[PRefine]: + for r in ly_array_iter(self.cdata_uses.refines): + yield PRefine(self.context, r, self.module) + + def augments(self) -> Iterator[PAugment]: + for a in ly_list_iter(self.cdata_uses.augments): + yield PAugment(self.context, a, self.module) + + def when_condition(self) -> Optional[str]: + if self.cdata_uses.when == ffi.NULL: + return None + return c2str(self.cdata_uses.when.cond) + + +# ------------------------------------------------------------------------------------- +class PActionInOut(PNode): + __slots__ = ("cdata_action_inout",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_action_inout = ffi.cast("struct lysp_node_action_inout *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_action_inout.musts): + yield Must(self.context, None, m) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_action_inout.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_action_inout.groupings): + yield PGrouping(self.context, g, self.module) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_action_inout.child): + yield PNode.new(self.context, c, self.module) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.RPC) +@PNode.register(PNode.ACTION) +class PAction(PNode): + __slots__ = ("cdata_action",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_action = ffi.cast("struct lysp_node_action *", cdata) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_action.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_action.groupings): + yield PGrouping(self.context, g, self.module) + + def input(self) -> PActionInOut: + ptr = ffi.addressof(self.cdata_action.input) + return PActionInOut(self.context, ptr, self.module) + + def output(self) -> PActionInOut: + ptr = ffi.addressof(self.cdata_action.output) + return PActionInOut(self.context, ptr, self.module) + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.NOTIF) +class PNotif(PNode): + __slots__ = ("cdata_notif",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_notif = ffi.cast("struct lysp_node_notif *", cdata) + + def musts(self) -> Iterator[Must]: + for m in ly_array_iter(self.cdata_notif.musts): + yield Must(self.context, None, m) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_notif.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_notif.groupings): + yield PGrouping(self.context, g, self.module) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_notif.child): + yield PNode.new(self.context, c, self.module) + + def __iter__(self) -> Iterator[PNode]: + return self.children() + + +# ------------------------------------------------------------------------------------- +@PNode.register(PNode.GROUPING) +class PGrouping(PNode): + __slots__ = ("cdata_grouping",) + + def __init__(self, context: "libyang.Context", cdata, module: Module) -> None: + super().__init__(context, cdata, module) + self.cdata_grouping = ffi.cast("struct lysp_node_grp *", cdata) + + def typedefs(self) -> Iterator[Typedef]: + for t in ly_array_iter(self.cdata_grouping.typedefs): + yield Typedef(self.context, t) + + def groupings(self) -> Iterator["PGrouping"]: + for g in ly_list_iter(self.cdata_grouping.groupings): + yield PGrouping(self.context, g, self.module) + + def children(self) -> Iterator[PNode]: + for c in ly_list_iter(self.cdata_grouping.child): + yield PNode.new(self.context, c, self.module) + + def actions(self) -> Iterator[PAction]: + for a in ly_list_iter(self.cdata_grouping.actions): + yield PAction(self.context, a, self.module) + + def notifications(self) -> Iterator[PNotif]: + for n in ly_list_iter(self.cdata_grouping.notifs): + yield PNotif(self.context, n, self.module) + + def __iter__(self) -> Iterator[PNode]: + return self.children() diff --git a/libyang/util.py b/libyang/util.py index 9554356e..d640a511 100644 --- a/libyang/util.py +++ b/libyang/util.py @@ -59,6 +59,14 @@ def ly_array_iter(cdata): yield cdata[i] +# ------------------------------------------------------------------------------------- +def ly_list_iter(cdata): + item = cdata + while item != ffi.NULL: + yield item + item = item.next + + # ------------------------------------------------------------------------------------- class IOType(enum.Enum): FD = enum.auto() diff --git a/tests/test_schema.py b/tests/test_schema.py index 64a8e30e..b9277bea 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -13,8 +13,25 @@ LibyangError, Module, Must, + PAction, + PActionInOut, + PAnydata, Pattern, + PAugment, + PCase, + PChoice, + PContainer, + PGrouping, + PLeaf, + PLeafList, + PList, + PNode, + PNotif, + PRefine, + PType, + PUses, Revision, + SAnydata, SCase, SChoice, SContainer, @@ -287,6 +304,79 @@ def test_iter_tree(self): tree = list(self.container.iter_tree(full=True)) self.assertEqual(len(tree), 25) + def test_container_parsed(self): + pnode = self.container.parsed() + self.assertIsInstance(pnode, PContainer) + self.assertIsNone(next(pnode.musts(), None)) + self.assertIsNone(pnode.when_condition()) + self.assertIsNone(pnode.presence()) + self.assertIsNone(next(pnode.typedefs(), None)) + self.assertIsNone(next(pnode.groupings(), None)) + self.assertIsNotNone(next(iter(pnode))) + self.assertIsNone(next(pnode.actions(), None)) + self.assertIsNone(next(pnode.notifications(), None)) + + +# ------------------------------------------------------------------------------------- +class UsesTest(unittest.TestCase): + def setUp(self): + self.ctx = Context(YANG_DIR) + mod = self.ctx.load_module("yolo-nodetypes") + mod.feature_enable_all() + + def tearDown(self): + self.ctx.destroy() + self.ctx = None + + def test_uses_parsed(self): + snode = next(self.ctx.find_path("/yolo-nodetypes:cont2")) + self.assertIsInstance(snode, SContainer) + pnode = snode.parsed() + self.assertIsInstance(pnode, PContainer) + pnode = next(iter(pnode)) + self.assertIsInstance(pnode, PUses) + + ref_pnode = next(pnode.refines()) + self.assertIsInstance(ref_pnode, PRefine) + self.assertEqual("cont3/leaf1", ref_pnode.nodeid()) + self.assertIsNone(ref_pnode.description()) + self.assertIsNone(ref_pnode.reference()) + self.assertIsNone(next(ref_pnode.if_features(), None)) + self.assertIsNone(next(ref_pnode.musts(), None)) + self.assertIsNone(ref_pnode.presence()) + self.assertIsNone(next(ref_pnode.defaults(), None)) + self.assertEqual(0, ref_pnode.min_elements()) + self.assertIsNone(ref_pnode.max_elements()) + self.assertIsNone(next(ref_pnode.extensions(), None)) + + aug_pnode = next(pnode.augments()) + self.assertIsInstance(aug_pnode, PAugment) + self.assertIsNotNone(next(iter(aug_pnode))) + self.assertIsNone(aug_pnode.when_condition()) + self.assertIsNone(next(aug_pnode.actions(), None)) + self.assertIsNone(next(aug_pnode.notifications(), None)) + + +# ------------------------------------------------------------------------------------- +class GroupingTest(unittest.TestCase): + def setUp(self): + self.ctx = Context(YANG_DIR) + + def tearDown(self): + self.ctx.destroy() + self.ctx = None + + def test_grouping_parsed(self): + mod = self.ctx.load_module("yolo-nodetypes") + pnode = next(mod.groupings()) + self.assertIsInstance(pnode, PGrouping) + self.assertIsNone(next(pnode.typedefs(), None)) + self.assertIsNone(next(pnode.groupings(), None)) + child = next(iter(pnode)) + self.assertIsNotNone(child) + self.assertIsNone(next(pnode.actions(), None)) + self.assertIsNone(next(pnode.notifications(), None)) + # ------------------------------------------------------------------------------------- class ListTest(unittest.TestCase): @@ -363,6 +453,25 @@ def test_list_min_max(self): self.assertEqual(list2.min_elements(), 0) self.assertEqual(list2.max_elements(), None) + def test_list_parsed(self): + list1 = next(self.ctx.find_path("/yolo-nodetypes:conf/list1")) + self.assertIsInstance(list1, SList) + pnode = list1.parsed() + self.assertIsInstance(pnode, PList) + self.assertIsNone(next(pnode.musts(), None)) + self.assertIsNone(pnode.when_condition()) + self.assertEqual("leaf1", pnode.key()) + self.assertIsNone(next(pnode.typedefs(), None)) + self.assertIsNone(next(pnode.groupings(), None)) + child = next(iter(pnode)) + self.assertIsInstance(child, PLeaf) + self.assertIsNone(next(pnode.actions(), None)) + self.assertIsNone(next(pnode.notifications(), None)) + self.assertEqual("leaf2 leaf3", next(pnode.uniques())) + self.assertEqual(2, pnode.min_elements()) + self.assertEqual(10, pnode.max_elements()) + self.assertFalse(pnode.ordered()) + # ------------------------------------------------------------------------------------- class RpcTest(unittest.TestCase): @@ -398,6 +507,21 @@ def test_rpc_params(self): def test_rpc_no_parent(self): self.assertIsNone(self.rpc.parent()) + def test_rpc_parsed(self): + self.assertIsInstance(self.rpc, SRpc) + pnode = self.rpc.parsed() + self.assertIsInstance(pnode, PAction) + self.assertIsNone(next(pnode.typedefs(), None)) + self.assertIsNone(next(pnode.groupings(), None)) + pnode2 = pnode.input() + self.assertIsInstance(pnode2, PActionInOut) + self.assertIsInstance(pnode.output(), PActionInOut) + self.assertIsNone(next(pnode2.musts(), None)) + self.assertIsNone(next(pnode2.typedefs(), None)) + self.assertIsNone(next(pnode2.groupings(), None)) + pnode3 = next(iter(pnode2)) + self.assertIsInstance(pnode3, PLeaf) + # ------------------------------------------------------------------------------------- class LeafTypeTest(unittest.TestCase): @@ -536,6 +660,28 @@ def test_leaf_type_require_instance(self): self.assertIsInstance(t, Type) self.assertFalse(t.require_instance()) + def test_leaf_type_parsed(self): + leaf = next(self.ctx.find_path("/yolo-system:conf/yolo-system:hostname")) + self.assertIsInstance(leaf, SLeaf) + t = leaf.type() + self.assertIsInstance(t, Type) + pnode = t.parsed() + self.assertIsInstance(pnode, PType) + self.assertEqual("types:host", pnode.name()) + self.assertIsNone(pnode.range()) + self.assertIsNone(pnode.length()) + self.assertIsNone(next(pnode.patterns(), None)) + self.assertIsNone(next(pnode.enums(), None)) + self.assertIsNone(next(pnode.bits(), None)) + self.assertIsNone(pnode.path()) + self.assertIsNone(next(pnode.bases(), None)) + self.assertIsNone(next(pnode.types(), None)) + self.assertIsNone(next(pnode.extensions(), None)) + self.assertIsNotNone(pnode.pmod()) + self.assertIsNone(pnode.compiled()) + self.assertEqual(0, pnode.fraction_digits()) + self.assertFalse(pnode.require_instance()) + # ------------------------------------------------------------------------------------- class LeafTest(unittest.TestCase): @@ -560,6 +706,40 @@ def test_leaf_default(self): leaf = next(self.ctx.find_path("/yolo-nodetypes:conf/percentage")) self.assertIsInstance(leaf.default(), float) + def test_leaf_parsed(self): + leaf = next(self.ctx.find_path("/yolo-nodetypes:conf/percentage")) + self.assertIsInstance(leaf, SLeaf) + pnode = leaf.parsed() + self.assertIsInstance(pnode, PLeaf) + must = next(pnode.musts()) + self.assertIsInstance(must, Must) + self.assertEqual(must.error_message(), "ERROR1") + must = next(leaf.must_conditions()) + self.assertIsInstance(must, str) + self.assertIsNone(pnode.when_condition()) + self.assertIsInstance(pnode.type(), PType) + self.assertIsNone(pnode.units()) + self.assertEqual("10.2", pnode.default()) + self.assertFalse(pnode.is_key()) + + # test basic PNode settings + self.assertIsNotNone(pnode.parent()) + self.assertEqual(PNode.LEAF, pnode.nodetype()) + self.assertIsNotNone(next(pnode.siblings())) + self.assertEqual("", repr(pnode)) + self.assertIsNone(pnode.description()) + self.assertIsNone(pnode.reference()) + self.assertIsNone(next(pnode.if_features(), None)) + self.assertIsNone(next(pnode.extensions(), None)) + self.assertFalse(pnode.config_set()) + self.assertFalse(pnode.config_false()) + self.assertFalse(pnode.mandatory()) + self.assertFalse(pnode.deprecated()) + self.assertFalse(pnode.obsolete()) + self.assertEqual("current", pnode.status()) + + NODETYPE_CLASS = {} + # ------------------------------------------------------------------------------------- class LeafListTest(unittest.TestCase): @@ -587,6 +767,20 @@ def test_leaf_list_min_max(self): self.assertEqual(leaflist2.min_elements(), 0) self.assertEqual(leaflist2.max_elements(), None) + def test_leaf_list_parsed(self): + leaflist = next(self.ctx.find_path("/yolo-nodetypes:conf/ratios")) + self.assertIsInstance(leaflist, SLeafList) + pnode = leaflist.parsed() + self.assertIsInstance(pnode, PLeafList) + self.assertIsNone(next(pnode.musts(), None)) + self.assertIsNone(pnode.when_condition()) + self.assertIsInstance(pnode.type(), PType) + self.assertIsNone(pnode.units()) + self.assertEqual("2.5", next(pnode.defaults())) + self.assertEqual(0, pnode.min_elements()) + self.assertIsNone(pnode.max_elements()) + self.assertFalse(pnode.ordered()) + # ------------------------------------------------------------------------------------- class ChoiceTest(unittest.TestCase): @@ -603,3 +797,60 @@ def test_choice_default(self): choice = next(conf.children((SNode.CHOICE,), with_choice=True)) self.assertIsInstance(choice, SChoice) self.assertIsInstance(choice.default(), SCase) + + def test_choice_parsed(self): + conf = next(self.ctx.find_path("/yolo-system:conf")) + choice = next(conf.children((SNode.CHOICE,), with_choice=True)) + self.assertIsInstance(choice, SChoice) + pnode = choice.parsed() + self.assertIsInstance(pnode, PChoice) + + case_pnode = next(iter(pnode)) + self.assertIsInstance(case_pnode, PCase) + self.assertIsNotNone(next(iter(case_pnode))) + self.assertIsNone(case_pnode.when_condition()) + + self.assertIsNone(pnode.when_condition()) + self.assertEqual("red", pnode.default()) + + +# ------------------------------------------------------------------------------------- +class AnydataTest(unittest.TestCase): + def setUp(self): + self.ctx = Context(YANG_DIR) + self.ctx.load_module("yolo-nodetypes") + + def tearDown(self): + self.ctx.destroy() + self.ctx = None + + def test_anydata_parsed(self): + snode = next(self.ctx.find_path("/yolo-nodetypes:any1")) + self.assertIsInstance(snode, SAnydata) + pnode = snode.parsed() + self.assertIsInstance(pnode, PAnydata) + self.assertIsNone(next(pnode.musts(), None)) + self.assertEqual("../cont2", pnode.when_condition()) + + +# ------------------------------------------------------------------------------------- +class NotificationTest(unittest.TestCase): + def setUp(self): + self.ctx = Context(YANG_DIR) + self.ctx.load_module("yolo-nodetypes") + + def tearDown(self): + self.ctx.destroy() + self.ctx = None + + def test_notification_parsed(self): + snode = next(self.ctx.find_path("/yolo-nodetypes:cont2")) + self.assertIsInstance(snode, SContainer) + pnode = snode.parsed() + self.assertIsInstance(pnode, PContainer) + pnode = next(pnode.notifications()) + self.assertIsInstance(pnode, PNotif) + self.assertIsNone(next(pnode.musts(), None)) + self.assertIsNone(next(pnode.typedefs(), None)) + self.assertIsNone(next(pnode.groupings(), None)) + self.assertIsNotNone(next(iter(pnode))) diff --git a/tests/yang/yolo/yolo-nodetypes.yang b/tests/yang/yolo/yolo-nodetypes.yang index a456ae1d..c2690dc9 100644 --- a/tests/yang/yolo/yolo-nodetypes.yang +++ b/tests/yang/yolo/yolo-nodetypes.yang @@ -81,4 +81,35 @@ module yolo-nodetypes { leaf test1 { type uint8; } + + grouping grp1 { + container cont3 { + leaf leaf1 { + type string; + } + } + } + + container cont2 { + presence "special container enabled"; + uses grp1 { + refine cont3/leaf1 { + mandatory true; + } + augment cont3 { + leaf leaf2 { + type int8; + } + } + } + notification interface-enabled { + leaf by-user { + type string; + } + } + } + + anydata any1 { + when "../cont2"; + } }