diff --git a/AUTHORS.rst b/AUTHORS.rst index 7661235..d094a38 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -9,4 +9,4 @@ contributors: - `Hynek Schlawack `_ - `Matt Montag `_ - `Pierre Tardy `_ -- `Pi Delport `_ \ No newline at end of file +- `Pi Delport `_ diff --git a/LICENSE b/LICENSE index d645695..02efdb0 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2014-2022 Spotify AB - Discontinued April 2022 and transferred to new maintainer JUAN DIEGO DE LA CRUZ PECHO Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.rst b/README.rst index 67407c5..84ce6b1 100644 --- a/README.rst +++ b/README.rst @@ -1,10 +1,6 @@ ramlfications: RAML reference implementation in Python ====================================================== -.. image:: https://img.shields.io/travis/spotify/ramlfications.svg?style=flat-square - :target: https://travis-ci.org/spotify/ramlfications - :alt: CI status - .. image:: https://img.shields.io/pypi/v/ramlfications.svg?style=flat-square :target: https://pypi.python.org/pypi/ramlfications/ :alt: Latest Version @@ -28,6 +24,9 @@ ramlfications: RAML reference implementation in Python .. begin +Note: this project has been discontinued at Spotify and will be transferred to a new maintainer, we are currently finishing the transfer of this repository and the associated pypi package. + + Requirements and Installation ============================= @@ -106,6 +105,13 @@ or:: Then within ``ramlfications/docs/_build`` you can open the index.html page in your browser. +Project History +^^^^^^^^^^^^^^^ + +Ramlfications was originally created by Spotify engineer github.com/econchick, but is currently not in use at Spotify. The project was discontinued +in April 2022 and transferred to an external maintainer. + + Still have issues? ^^^^^^^^^^^^^^^^^^ diff --git a/ramlfications/__init__.py b/ramlfications/__init__.py index 81deda4..e286c92 100644 --- a/ramlfications/__init__.py +++ b/ramlfications/__init__.py @@ -5,6 +5,7 @@ from ramlfications.config import setup_config from ramlfications.parser import parse_raml + from ramlfications.utils import load_file, load_string diff --git a/ramlfications/_helpers.py b/ramlfications/_helpers.py new file mode 100644 index 0000000..a198e77 --- /dev/null +++ b/ramlfications/_helpers.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Spotify AB + +import os +import sys + +import six + +from .errors import LoadRAMLError +from .loader import RAMLLoader + +if sys.version_info[0] == 2: + from io import open + + +def load_file(raml_file): + try: + with _get_raml_object(raml_file) as raml: + return RAMLLoader().load(raml) + except IOError as e: + raise LoadRAMLError(e) + + +def load_string(raml_str): + return RAMLLoader().load(raml_str) + + +def _get_raml_object(raml_file): + """ + Returns a file object. + """ + if raml_file is None: + msg = "RAML file can not be 'None'." + raise LoadRAMLError(msg) + + if isinstance(raml_file, six.text_type) or isinstance( + raml_file, bytes): + return open(os.path.abspath(raml_file), 'r', encoding="UTF-8") + elif hasattr(raml_file, 'read'): + return raml_file + else: + msg = ("Can not load object '{0}': Not a basestring type or " + "file object".format(raml_file)) + raise LoadRAMLError(msg) diff --git a/ramlfications/loader.py b/ramlfications/loader.py index ae25442..6415778 100644 --- a/ramlfications/loader.py +++ b/ramlfications/loader.py @@ -1,7 +1,14 @@ # -*- coding: utf-8 -*- # Copyright (c) 2015 Spotify AB +<<<<<<< HEAD from __future__ import absolute_import, division, print_function +======= +try: + from collections import OrderedDict +except ImportError: # pragma: no cover + from ordereddict import OrderedDict +>>>>>>> master import os import jsonref @@ -21,6 +28,9 @@ RAML10_FRAGMENT_TYPES = ("DataType", "AnnotationType") +__all__ = ["RAMLLoader"] + + class RAMLLoader(object): """ Extends YAML loader to load RAML files with ``!include`` tags. diff --git a/ramlfications/parser.py b/ramlfications/parser.py new file mode 100644 index 0000000..3cdc198 --- /dev/null +++ b/ramlfications/parser.py @@ -0,0 +1,1114 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Spotify AB + +from __future__ import absolute_import, division, print_function +import re + +import attr +from six import iteritems, iterkeys, itervalues + + +from .config import MEDIA_TYPES +from .errors import InvalidRAMLError +from .parameters import ( + Documentation, Header, Body, Response, URIParameter, QueryParameter, + FormParameter, SecurityScheme +) +from .parser_utils import ( + security_schemes +) +from .raml import RootNode, ResourceNode, ResourceTypeNode, TraitNode +from .utils import ( + load_schema, _resource_type_lookup, + _get_resource_type, _get_trait, _get_attribute, + _get_inherited_attribute, _remove_duplicates, _create_uri_params, + _get, _create_base_param_obj, _get_res_type_attribute, + _get_inherited_type_params, _get_inherited_item, _get_attribute_dict, + get_inherited, set_param_object, set_params, _get_data_union, + _preserve_uri_order +) + + +__all__ = ["parse_raml"] + + +def parse_raml(loaded_raml, config): + """ + Parse loaded RAML file into RAML/Python objects. + + :param RAMLDict loaded_raml: OrderedDict of loaded RAML file + :returns: :py:class:`.raml.RootNode` object. + :raises: :py:class:`.errors.InvalidRAMLError` when RAML file is invalid + """ + + validate = str(_get(config, "validate")).lower() == 'true' + + # Postpone validating the root node until the end; otherwise, + # we end up with duplicate validation exceptions. + attr.set_run_validators(False) + root = create_root(loaded_raml, config) + attr.set_run_validators(validate) + + root.security_schemes = create_sec_schemes(root.raml_obj, root) + root.traits = create_traits(root.raml_obj, root) + root.resource_types = create_resource_types(root.raml_obj, root) + root.resources = create_resources(root.raml_obj, [], root, + parent=None) + + if validate: + attr.validate(root) # need to validate again for root node + + if root.errors: + raise InvalidRAMLError(root.errors) + + return root + + +def create_root(raml, config): + """ + Creates a Root Node based off of the RAML's root section. + + :param RAMLDict raml: loaded RAML file + :returns: :py:class:`.raml.RootNode` object with API root attributes set + """ + + errors = [] + + def protocols(): + explicit_protos = _get(raml, "protocols") + implicit_protos = re.findall(r"(https|http)", base_uri()) + implicit_protos = [p.upper() for p in implicit_protos] + + return explicit_protos or implicit_protos or None + + def base_uri(): + base_uri = _get(raml, "baseUri", "") + if "{version}" in base_uri: + base_uri = base_uri.replace("{version}", + str(_get(raml, "version"))) + return base_uri + + def base_uri_params(): + data = _get(raml, "baseUriParameters", {}) + params = _create_base_param_obj(data, URIParameter, config, errors) + uri = _get(raml, "baseUri", "") + declared = _get(raml, "uriParameters", {}) + declared = list(iterkeys(declared)) + return _preserve_uri_order(uri, params, config, errors, declared) + + def uri_params(): + data = _get(raml, "uriParameters", {}) + params = _create_base_param_obj(data, URIParameter, config, errors) + uri = _get(raml, "baseUri", "") + + declared = [] + base = base_uri_params() + if base: + declared = [p.name for p in base] + return _preserve_uri_order(uri, params, config, errors, declared) + + def docs(): + d = _get(raml, "documentation", []) + assert isinstance(d, list), "Error parsing documentation" + docs = [Documentation(_get(i, "title"), _get(i, "content")) for i in d] + return docs or None + + def schemas(): + _schemas = _get(raml, "schemas") + if not _schemas: + return None + schemas = [] + for schema in _schemas: + value = load_schema(list(itervalues(schema))[0]) + schemas.append({list(iterkeys(schema))[0]: value}) + return schemas or None + + return RootNode( + raml_obj=raml, + raw=raml, + title=_get(raml, "title"), + version=_get(raml, "version"), + protocols=protocols(), + base_uri=base_uri(), + base_uri_params=base_uri_params(), + uri_params=uri_params(), + media_type=_get(raml, "mediaType"), + documentation=docs(), + schemas=schemas(), + config=config, + secured_by=_get(raml, "securedBy"), + errors=errors + ) + + +def create_sec_schemes(raml_data, root): + """ + Parse security schemes into ``SecurityScheme`` objects + + :param dict raml_data: Raw RAML data + :param RootNode root: Root Node + :returns: list of :py:class:`.parameters.SecurityScheme` objects + """ + def _map_object_types(item): + return { + "headers": headers, + "body": body, + "responses": responses, + "queryParameters": query_params, + "uriParameters": uri_params, + "formParameters": form_params, + "usage": usage, + "mediaType": media_type, + "protocols": protocols, + "documentation": documentation, + }[item] + + def headers(header_data): + _headers = [] + header_data = _get(header_data, "headers", {}) + for k, v in list(iteritems(header_data)): + h = _create_base_param_obj({k: v}, + Header, + root.config, + root.errors) + _headers.extend(h) + return _headers + + def body(body_data): + body_data = _get(body_data, "body", {}) + _body = [] + for k, v in list(iteritems(body_data)): + body = Body( + mime_type=k, + raw=v, + schema=load_schema(_get(v, "schema")), + example=load_schema(_get(v, "example")), + form_params=_get(v, "formParameters"), + config=root.config, + errors=root.errors + ) + _body.append(body) + return _body + + def responses(resp_data): + _resps = [] + resp_data = _get(resp_data, "responses", {}) + for k, v in list(iteritems(resp_data)): + response = Response( + code=k, + raw=v, + desc=_get(v, "description"), + headers=headers(_get(v, "headers", {})), + body=body(_get(v, "body", {})), + config=root.config, + errors=root.errors + ) + _resps.append(response) + return sorted(_resps, key=lambda x: x.code) + + def query_params(param_data): + param_data = _get(param_data, "queryParameters", {}) + _params = [] + for k, v in list(iteritems(param_data)): + p = _create_base_param_obj({k: v}, + QueryParameter, + root.config, + root.errors) + _params.extend(p) + return _params + + def uri_params(param_data): + param_data = _get(param_data, "uriParameters") + _params = [] + for k, v in list(iteritems(param_data)): + p = _create_base_param_obj({k: v}, + URIParameter, + root.config, + root.errors) + _params.extend(p) + return _params + + def form_params(param_data): + param_data = _get(param_data, "formParameters", {}) + _params = [] + for k, v in list(iteritems(param_data)): + p = _create_base_param_obj({k: v}, + FormParameter, + root.config, + root.errors) + _params.extend(p) + return _params + + def usage(desc_by_data): + return _get(desc_by_data, "usage") + + def media_type(desc_by_data): + return _get(desc_by_data, "mediaType") + + def protocols(desc_by_data): + return _get(desc_by_data, "protocols") + + def documentation(desc_by_data): + d = _get(desc_by_data, "documentation", []) + assert isinstance(d, list), "Error parsing documentation" + docs = [Documentation(_get(i, "title"), _get(i, "content")) for i in d] + return docs or None + + def set_property(node, obj, node_data): + func = _map_object_types(obj) + item_objs = func({obj: node_data}) + setattr(node, func.__name__, item_objs) + + def initial_wrap(key, data): + return SecurityScheme( + name=key, + raw=data, + type=_get(data, "type"), + described_by=_get(data, "describedBy", {}), + desc=_get(data, "description"), + settings=_get(data, "settings"), + config=root.config, + errors=root.errors + ) + + def final_wrap(node): + for obj, node_data in list(iteritems(node.described_by)): + set_property(node, obj, node_data) + return node + + schemes = _get(raml_data, "securitySchemes", []) + scheme_objs = [] + for s in schemes: + name = list(iterkeys(s))[0] + data = list(itervalues(s))[0] + node = initial_wrap(name, data) + node = final_wrap(node) + scheme_objs.append(node) + return scheme_objs or None + + +def create_traits(raml_data, root): + """ + Parse traits into ``Trait`` objects. + + :param dict raml_data: Raw RAML data + :param RootNode root: Root Node + :returns: list of :py:class:`.raml.TraitNode` objects + """ + def description(): + return _get(data, "description") + + def protocols(): + return _get(data, "protocols") + + def query_params(): + return set_param_object(data, "queryParameters", root) + + def uri_params(): + return set_param_object(data, "uriParameters", root) + + def form_params(): + return set_param_object(data, "formParameters", root) + + def base_uri_params(): + return set_param_object(data, "baseUriParameters", root) + + def headers(data): + return set_param_object(data, "headers", root) + + def body(data): + body = _get(data, "body", {}) + body_objects = [] + for key, value in list(iteritems(body)): + body = Body( + mime_type=key, + raw=value, + schema=load_schema(_get(value, "schema")), + example=load_schema(_get(value, "example")), + form_params=_get(value, "formParameters"), + config=root.config, + errors=root.errors + ) + body_objects.append(body) + return body_objects or None + + def responses(): + response_objects = [] + for key, value in list(iteritems(_get(data, "responses", {}))): + response = Response( + code=key, + raw=value, + desc=_get(value, "description"), + headers=headers(value), + body=body(value), + config=root.config, + errors=root.errors + ) + response_objects.append(response) + return sorted(response_objects, key=lambda x: x.code) or None + + def wrap(key, data): + return TraitNode( + name=key, + raw=data, + root=root, + query_params=query_params(), + uri_params=uri_params(), + form_params=form_params(), + base_uri_params=base_uri_params(), + headers=headers(data), + body=body(data), + responses=responses(), + desc=description(), + media_type=_get(data, "mediaType"), + usage=_get(data, "usage"), + protocols=protocols(), + errors=root.errors + ) + + traits = _get(raml_data, "traits", []) + trait_objects = [] + for trait in traits: + name = list(iterkeys(trait))[0] + data = list(itervalues(trait))[0] + trait_objects.append(wrap(name, data)) + return trait_objects or None + + +def create_resource_types(raml_data, root): + """ + Parse resourceTypes into ``ResourceTypeNode`` objects. + + :param dict raml_data: Raw RAML data + :param RootNode root: Root Node + :returns: list of :py:class:`.raml.ResourceTypeNode` objects + """ + # TODO: move this outside somewhere - config? + accepted_methods = _get(root.config, "http_optional") + + ##### + # Set ResourceTypeNode attributes + ##### + + def headers(data): + _headers = _get(data, "headers", {}) + if _get(v, "type"): + _headers = _get_inherited_item(_headers, "headers", + resource_types, + meth, v) + + header_objs = _create_base_param_obj(_headers, + Header, + root.config, + root.errors) + if header_objs: + for h in header_objs: + h.method = method(meth) + + return header_objs + + def body(data): + _body = _get(data, "body", default={}) + if _get(v, "type"): + _body = _get_inherited_item(_body, "body", resource_types, + meth, v) + + body_objects = [] + for key, value in list(iteritems(_body)): + body = Body( + mime_type=key, + raw=value, + schema=load_schema(_get(value, "schema")), + example=load_schema(_get(value, "example")), + form_params=_get(value, "formParameters"), + config=root.config, + errors=root.errors + ) + body_objects.append(body) + return body_objects or None + + def responses(data): + response_objects = [] + _responses = _get(data, "responses", {}) + if _get(v, "type"): + _responses = _get_inherited_item(_responses, "responses", + resource_types, meth, v) + + for key, value in list(iteritems(_responses)): + _headers = _get(_get(data, "responses", {}), key, {}) + _headers = _get(_headers, "headers", {}) + header_objs = _create_base_param_obj(_headers, Header, + root.config, root.errors) + if header_objs: + for h in header_objs: + h.method = method(meth) + response = Response( + code=key, + raw={key: value}, + desc=_get(value, "description"), + headers=header_objs, + body=body(value), + config=root.config, + method=method(meth), + errors=root.errors + ) + response_objects.append(response) + if response_objects: + return sorted(response_objects, key=lambda x: x.code) + return None + + def uri_params(data): + uri_params = _get_attribute_dict(data, "uriParameters", v) + + if _get(v, "type"): + uri_params = _get_inherited_type_params(v, "uriParameters", + uri_params, resource_types) + return _create_base_param_obj(uri_params, + URIParameter, + root.config, + root.errors) + + def base_uri_params(data): + uri_params = _get_attribute_dict(data, "baseUriParameters", v) + + return _create_base_param_obj(uri_params, + URIParameter, + root.config, + root.errors) + + def query_params(data): + query_params = _get_attribute_dict(data, "queryParameters", v) + + if _get(v, "type"): + query_params = _get_inherited_type_params(v, "queryParameters", + query_params, + resource_types) + + return _create_base_param_obj(query_params, + QueryParameter, + root.config, + root.errors) + + def form_params(data): + form_params = _get_attribute_dict(data, "formParameters", v) + + if _get(v, "type"): + form_params = _get_inherited_type_params(v, "formParameters", + form_params, + resource_types) + + return _create_base_param_obj(form_params, + FormParameter, + root.config, + root.errors) + + def description(): + # prefer the resourceType method description + if meth: + method_attr = _get(v, meth) + desc = _get(method_attr, "description") + return desc or _get(v, "description") + return _get(v, "description") + + def type_(): + return _get(v, "type") + + def method(meth): + if not meth: + return None + if "?" in meth: + return meth[:-1] + return meth + + def optional(): + if meth: + return "?" in meth + + def protocols(data): + m, r = _get_res_type_attribute(v, data, "protocols", None) + return m or r or root.protocols + + def is_(data): + m, r = _get_res_type_attribute(v, data, "is", default=[]) + return m + r or None + + def traits(data): + assigned = is_(data) + if assigned: + if root.traits: + trait_objs = [] + for trait in assigned: + obj = [t for t in root.traits if t.name == trait] + if obj: + trait_objs.append(obj[0]) + return trait_objs or None + + def secured_by(data): + m, r = _get_res_type_attribute(v, data, "securedBy", []) + return m + r or None + + def security_schemes_(data): + secured = secured_by(data) + return security_schemes(secured, root) + + def wrap(key, data, meth, _v): + return ResourceTypeNode( + name=key, + raw=data, + root=root, + headers=headers(data), + body=body(data), + responses=responses(data), + uri_params=uri_params(data), + base_uri_params=base_uri_params(data), + query_params=query_params(data), + form_params=form_params(data), + media_type=_get(v, "mediaType"), + desc=description(), + type=type_(), + method=method(meth), + usage=_get(v, "usage"), + optional=optional(), + is_=is_(data), + traits=traits(data), + secured_by=secured_by(data), + security_schemes=security_schemes_(data), + display_name=_get(data, "displayName", key), + protocols=protocols(data), + errors=root.errors + ) + + resource_types = _get(raml_data, "resourceTypes", []) + resource_type_objects = [] + child_res_type_objects = [] + child_res_type_names = [] + + for res in resource_types: + for k, v in list(iteritems(res)): + if isinstance(v, dict): + if "type" in list(iterkeys(v)): + child_res_type_objects.append({k: v}) + child_res_type_names.append(k) + + else: + for meth in list(iterkeys(v)): + if meth in accepted_methods: + method_data = _get(v, meth, {}) + resource = wrap(k, method_data, meth, v) + resource_type_objects.append(resource) + else: + meth = None + resource = wrap(k, {}, meth, v) + resource_type_objects.append(resource) + + while child_res_type_objects: + child = child_res_type_objects.pop() + name = list(iterkeys(child))[0] + data = list(itervalues(child))[0] + parent = data.get("type") + if parent in child_res_type_names: + continue + p_data = [r for r in resource_types if list(iterkeys(r))[0] == parent] + p_data = p_data[0].get(parent) + res_data = _get_data_union(data, p_data) + + for meth in list(iterkeys(res_data)): + if meth in accepted_methods: + method_data = _get(res_data, meth, {}) + comb_data = dict(list(iteritems(method_data)) + + list(iteritems(res_data))) + resource = ResourceTypeNode( + name=name, + raw=res_data, + root=root, + headers=headers(method_data), + body=body(method_data), + responses=responses(method_data), + uri_params=uri_params(comb_data), + base_uri_params=base_uri_params(comb_data), + query_params=query_params(method_data), + form_params=form_params(method_data), + media_type=_get(v, "mediaType"), + desc=description(), + type=_get(res_data, "type"), + method=method(meth), + usage=_get(res_data, "usage"), + optional=optional(), + is_=is_(res_data), + traits=traits(res_data), + secured_by=secured_by(res_data), + security_schemes=security_schemes_(res_data), + display_name=_get(method_data, "displayName", name), + protocols=protocols(res_data), + errors=root.errors + ) + resource_type_objects.append(resource) + + return resource_type_objects or None + + +def create_resources(node, resources, root, parent): + """ + Recursively traverses the RAML file via DFS to find each resource + endpoint. + + :param dict node: Dictionary of node to traverse + :param list resources: List of collected ``ResourceNode`` s + :param RootNode root: The ``RootNode`` of the API + :param ResourceNode parent: Parent ``ResourceNode`` of current ``node`` + :returns: List of :py:class:`.raml.ResourceNode` objects. + """ + for k, v in list(iteritems(node)): + if k.startswith("/"): + avail = _get(root.config, "http_optional") + methods = [m for m in avail if m in list(iterkeys(v))] + if "type" in list(iterkeys(v)): + assigned = _resource_type_lookup(_get(v, "type"), root) + if hasattr(assigned, "method"): + if not assigned.optional: + methods.append(assigned.method) + methods = list(set(methods)) + if methods: + for m in methods: + child = create_node(name=k, + raw_data=v, + method=m, + parent=parent, + root=root) + resources.append(child) + # inherit resource type methods + elif "type" in list(iterkeys(v)): + if hasattr(assigned, "method"): + method = assigned.method + else: + method = None + child = create_node(name=k, + raw_data=v, + method=method, + parent=parent, + root=root) + resources.append(child) + else: + child = create_node(name=k, + raw_data=v, + method=None, + parent=parent, + root=root) + resources.append(child) + resources = create_resources(child.raw, resources, root, child) + return resources + + +def create_node(name, raw_data, method, parent, root): + """ + Create a Resource Node object. + + :param str name: Name of resource node + :param dict raw_data: Raw RAML data associated with resource node + :param str method: HTTP method associated with resource node + :param ResourceNode parent: Parent node object of resource node, if any + :param RootNode api: API ``RootNode`` that the resource node is attached to + :returns: :py:class:`.raml.ResourceNode` object + """ + ##### + # Node attribute functions + ##### + def path(): + """Set resource's relative URI path.""" + parent_path = "" + if parent: + parent_path = parent.path + return parent_path + name + + def absolute_uri(): + """Set resource's absolute URI path.""" + uri = root.base_uri + path() + proto = protocols() + if proto: + uri = uri.split("://") + if len(uri) == 2: + uri = uri[1] + if root.protocols: + _proto = list(set(root.protocols) & set(proto)) + # if resource protocols and root protocols share a protocol + # then use that one + if _proto: + uri = _proto[0].lower() + "://" + uri + # if no shared protocols, use the first of the resource + # protocols + else: + uri = proto[0].lower() + "://" + uri + return uri + + def protocols(): + """Set resource's supported protocols.""" + # trait = _get_trait("protocols", root, is_()) + kwargs = dict(root=root, + is_=is_(), + type_=type_(), + method=method, + data=raw_data, + parent=parent) + objects_to_inherit = [ + "traits", "types", "method", "resource", "parent" + ] + inherited = get_inherited("protocols", objects_to_inherit, **kwargs) + trait = inherited["traits"] + r_type = inherited["types"] + meth = inherited["method"] + res = inherited["resource"] + parent_ = inherited["parent"] + default = [root.base_uri.split("://")[0].upper()] + + return meth or r_type or trait or res or parent_ or default + + def headers(): + """Set resource's supported headers.""" + headers = _get_attribute("headers", method, raw_data) + header_objs = _get_inherited_attribute("headers", root, type_(), + method, is_()) + + _headers = _create_base_param_obj(headers, + Header, + root.config, + root.errors, + method=method) + if _headers is None: + return header_objs or None + return _remove_duplicates(header_objs, _headers) + + def body(): + """Set resource's supported request/response body.""" + bodies = _get_attribute("body", method, raw_data) + body_objects = _get_inherited_attribute("body", root, type_(), + method, is_()) + + _body_objs = [] + for k, v in list(iteritems(bodies)): + if v is None: + continue + body = Body( + mime_type=k, + raw={k: v}, + schema=load_schema(_get(v, "schema")), + example=load_schema(_get(v, "example")), + form_params=_get(v, "formParameters"), + config=root.config, + errors=root.errors + ) + _body_objs.append(body) + if _body_objs == []: + return body_objects or None + return _remove_duplicates(body_objects, _body_objs) + + def responses(): + """Set resource's expected responses.""" + def resp_headers(headers): + """Set response headers.""" + header_objs = [] + for k, v in list(iteritems(headers)): + header = Header( + name=k, + display_name=_get(v, "displayName", default=k), + method=method, + raw=headers, + type=_get(v, "type", default="string"), + desc=_get(v, "description"), + example=_get(v, "example"), + default=_get(v, "default"), + minimum=_get(v, "minimum"), + maximum=_get(v, "maximum"), + min_length=_get(v, "minLength"), + max_length=_get(v, "maxLength"), + enum=_get(v, "enum"), + repeat=_get(v, "repeat", default=False), + pattern=_get(v, "pattern"), + config=root.config, + errors=root.errors + ) + header_objs.append(header) + return header_objs or None + + def resp_body(body): + """Set response body.""" + body_list = [] + default_body = {} + for (key, spec) in body.items(): + if key not in MEDIA_TYPES: + # if a root mediaType was defined, the response body + # may omit the mime_type definition + if key in ('schema', 'example'): + default_body[key] = load_schema(spec) if spec else {} + else: + mime_type = key + # spec might be '!!null' + raw = spec or body + _schema = {} + _example = {} + if spec: + _schema_spec = _get(spec, 'schema', '') + _example_spec = _get(spec, 'example', '') + if _schema_spec: + _schema = load_schema(_schema_spec) + if _example_spec: + _example = load_schema(_example_spec) + body_list.append(Body( + mime_type=mime_type, + raw=raw, + schema=_schema, + example=_example, + form_params=None, + config=root.config, + errors=root.errors + )) + if default_body: + body_list.append(Body( + mime_type=root.media_type, + raw=body, + schema=_get(default_body, 'schema'), + example=_get(default_body, 'example'), + form_params=None, + config=root.config, + errors=root.errors + )) + + return body_list or None + + resps = _get_attribute("responses", method, raw_data) + type_resp = _get_resource_type("responses", root, type_(), method) + trait_resp = _get_trait("responses", root, is_()) + resp_objs = type_resp + trait_resp + resp_codes = [r.code for r in resp_objs] + for k, v in list(iteritems(resps)): + if k in resp_codes: + resp = [r for r in resp_objs if r.code == k][0] + index = resp_objs.index(resp) + inherit_resp = resp_objs.pop(index) + headers = resp_headers(_get(v, "headers", default={})) + if inherit_resp.headers: + headers = _remove_duplicates(inherit_resp.headers, headers) + # if headers: + # headers.extend(inherit_resp.headers) + # else: + # headers = inherit_resp.headers + body = resp_body(_get(v, "body", {})) + if inherit_resp.body: + body = _remove_duplicates(inherit_resp.body, body) + # if body: + # body.extend(inherit_resp.body) + # else: + # body = inherit_resp.body + resp = Response( + code=k, + raw={k: v}, # should prob get data union + method=method, + desc=_get(v, "description") or inherit_resp.desc, + headers=headers, + body=body, + config=root.config, + errors=root.errors + ) + resp_objs.insert(index, resp) # preserve order + else: + _headers = _get(v, "headers", default={}) + _body = _get(v, "body", default={}) + resp = Response( + code=k, + raw={k: v}, + method=method, + desc=_get(v, "description"), + headers=resp_headers(_headers), + body=resp_body(_body), + config=root.config, + errors=root.errors + ) + resp_objs.append(resp) + + return resp_objs or None + + def uri_params(): + """Set resource's URI parameters.""" + unparsed_attr = "uriParameters" + parsed_attr = "uri_params" + root_params = root.uri_params + params = _create_uri_params(unparsed_attr, parsed_attr, root_params, + root, type_(), is_(), method, raw_data, + parent) + declared = [] + base = base_uri_params() + if base: + declared = [p.name for p in base] + + return _preserve_uri_order(absolute_uri(), params, root.config, + root.errors, declared) + + def base_uri_params(): + """Set resource's base URI parameters.""" + root_params = root.base_uri_params + kw = dict(type=type_(), is_=is_(), root_params=root_params) + params = set_params(raw_data, "base_uri_params", root, method, + inherit=True, **kw) + declared = [] + uri = root.uri_params + base = root.base_uri_params + if uri: + declared = [p.name for p in uri] + if base: + declared.extend([p.name for p in base]) + return _preserve_uri_order(root.base_uri, params, root.config, + root.errors, declared) + + def query_params(): + kw = dict(type_=type_(), is_=is_()) + return set_params(raw_data, "query_params", root, method, + inherit=True, **kw) + + def form_params(): + """Set resource's form parameters.""" + kw = dict(type_=type_(), is_=is_()) + return set_params(raw_data, "form_params", root, method, + inherit=True, **kw) + + def media_type_(): + """Set resource's supported media types.""" + if method is None: + return None + kwargs = dict(root=root, + is_=is_(), + type_=type_(), + method=method, + data=raw_data) + objects_to_inherit = [ + "method", "traits", "types", "resource", "root" + ] + inherited = get_inherited("mediaType", objects_to_inherit, **kwargs) + meth = inherited.get("method") + trait = inherited.get("trait") + r_type = inherited.get("types") + res = inherited.get("resource") + root_ = inherited.get("root") + return meth or trait or r_type or res or root_ + + def description(): + """Set resource's description.""" + desc = _get(raw_data, "description") + try: + desc = _get(_get(raw_data, method), "description") + if desc is None: + raise AttributeError + except AttributeError: + if type_(): + assigned = _resource_type_lookup(type_(), root) + try: + if assigned.method == method: + desc = assigned.description.raw + except AttributeError: + pass + else: + desc = _get(raw_data, "description") + return desc + + def is_(): + """Set resource's assigned trait names.""" + is_list = [] + res_level = _get(raw_data, "is") + if res_level: + assert isinstance(res_level, list), "Error parsing trait" + is_list.extend(res_level) + method_level = _get(raw_data, method, {}) + if method_level: + method_level = _get(method_level, "is") + if method_level: + assert isinstance(method_level, list), "Error parsing trait" + is_list.extend(method_level) + return is_list or None + + def traits(): + """Set resource's assigned trait objects.""" + assigned = is_() + if assigned: + if root.traits: + trait_objs = [] + for trait in assigned: + obj = [t for t in root.traits if t.name == trait] + if obj: + trait_objs.append(obj[0]) + return trait_objs or None + + # TODO: wow this function sucks. + def type_(): + """Set resource's assigned resource type name.""" + __get_method = _get(raw_data, method, {}) + assigned_type = _get(__get_method, "type") + if assigned_type: + if not isinstance(assigned_type, dict): + return assigned_type + return list(iterkeys(assigned_type))[0] # NOCOV + + assigned_type = _get(raw_data, "type") + if isinstance(assigned_type, dict): + return list(iterkeys(assigned_type))[0] # NOCOV + return assigned_type + + def resource_type(): + """Set resource's assigned resource type objects.""" + if type_() and root.resource_types: + assigned_name = type_() + res_types = root.resource_types + type_obj = [r for r in res_types if r.name == assigned_name] + if type_obj: + return type_obj[0] + + def secured_by(): + """ + Set resource's assigned security scheme names and related paramters. + """ + if method is not None: + method_level = _get(raw_data, method, {}) + if method_level: + secured_by = _get(method_level, "securedBy") + if secured_by: + return secured_by + resource_level = _get(raw_data, "securedBy") + if resource_level: + return resource_level + root_level = root.secured_by + if root_level: + return root_level + + def security_schemes_(): + """Set resource's assigned security scheme objects.""" + secured = secured_by() + return security_schemes(secured, root) + + node = ResourceNode( + name=name, + raw=raw_data, + method=method, + parent=parent, + root=root, + display_name=_get(raw_data, "displayName", name), + path=path(), + absolute_uri=absolute_uri(), + protocols=protocols(), + headers=headers(), + body=body(), + responses=responses(), + uri_params=uri_params(), + base_uri_params=base_uri_params(), + query_params=query_params(), + form_params=form_params(), + media_type=media_type_(), + desc=description(), + is_=is_(), + traits=traits(), + type=type_(), + resource_type=resource_type(), + secured_by=secured_by(), + security_schemes=security_schemes_(), + errors=root.errors + ) + if resource_type(): + # correct inheritance (issue #23) + node._inherit_type() + return node diff --git a/ramlfications/utils.py b/ramlfications/utils.py new file mode 100644 index 0000000..3c3b6bb --- /dev/null +++ b/ramlfications/utils.py @@ -0,0 +1,733 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Spotify AB + +from __future__ import absolute_import, division, print_function + + +import json +import logging +import os +import re +import sys + +try: + from collections import OrderedDict +except ImportError: # NOCOV + from ordereddict import OrderedDict + +from six import iterkeys, iteritems +import xmltodict + +from .errors import MediaTypeError +from .parameters import ( + Body, URIParameter, Header, FormParameter, QueryParameter +) + +PYVER = sys.version_info[:3] + +if PYVER == (2, 7, 9) or PYVER == (3, 4, 3): # NOCOV + import six.moves.urllib.request as urllib + import six.moves.urllib.error as urllib_error + URLLIB = True + SECURE_DOWNLOAD = True +else: + try: # NOCOV + import requests + URLLIB = False + SECURE_DOWNLOAD = True + except ImportError: + import six.moves.urllib.request as urllib + import six.moves.urllib.error as urllib_error + URLLIB = True + SECURE_DOWNLOAD = False + + +IANA_URL = "https://www.iana.org/assignments/media-types/media-types.xml" + + +def load_schema(data): + """ + Load Schema/Example data depending on its type (JSON, XML). + + If error in parsing as JSON and XML, just returns unloaded data. + + :param str data: schema/example data + """ + try: + return json.loads(data) + except Exception: # POKEMON! + pass + + try: + return xmltodict.parse(data) + except Exception: # GOTTA CATCH THEM ALL + pass + + return data + + +def setup_logger(key): + """General logger""" + log = logging.getLogger(__name__) + log.setLevel(logging.DEBUG) + console = logging.StreamHandler() + console.setLevel(logging.DEBUG) + msg = "{key} - %(levelname)s - %(message)s".format(key=key) + formatter = logging.Formatter(msg) + console.setFormatter(formatter) + + log.addHandler(console) + return log + + +def _requests_download(url): + """Download a URL using ``requests`` library""" + try: + response = requests.get(url) + return response.text + except requests.exceptions.RequestException as e: + msg = "Error downloading from {0}: {1}".format(url, e) + raise MediaTypeError(msg) + + +def _urllib_download(url): + """Download a URL using ``urllib`` library""" + try: + response = urllib.urlopen(url) + except urllib_error.URLError as e: + msg = "Error downloading from {0}: {1}".format(url, e) + raise MediaTypeError(msg) + return response.read() + + +def download_url(url): + """ + General download function, given a URL. + + If running 2.7.8 or earlier, or 3.4.2 or earlier, then use + ``requests`` if it's installed. Otherwise, use ``urllib``. + """ + log = setup_logger("DOWNLOAD") + if SECURE_DOWNLOAD and not URLLIB: + return _requests_download(url) + elif SECURE_DOWNLOAD and URLLIB: + return _urllib_download(url) + msg = ("Downloading over HTTPS but can not verify the host's " + "certificate. To avoid this in the future, `pip install" + " \"requests[security]\"`.") + log.warn(msg) + return _urllib_download(url) + + +def _xml_to_dict(response_text): + """Parse XML response from IANA into a Python ``dict``.""" + try: + return xmltodict.parse(response_text) + except xmltodict.expat.ExpatError as e: + msg = "Error parsing XML: {0}".format(e) + raise MediaTypeError(msg) + + +def _extract_mime_types(registry): + """ + Parse out MIME types from a defined registry (e.g. "application", + "audio", etc). + """ + mime_types = [] + records = registry.get("record", {}) + reg_name = registry.get("@id") + for rec in records: + mime = rec.get("file", {}).get("#text") + if mime: + mime_types.append(mime) + else: + mime = rec.get("name") + if mime: + hacked_mime = reg_name + "/" + mime + mime_types.append(hacked_mime) + return mime_types + + +def _parse_xml_data(xml_data): + """Parse the given XML data.""" + registries = xml_data.get("registry", {}).get("registry") + if not registries: + msg = "No registries found to parse." + raise MediaTypeError(msg) + if len(registries) is not 9: + msg = ("Expected 9 registries but parsed " + "{0}".format(len(registries))) + raise MediaTypeError(msg) + all_mime_types = [] + for registry in registries: + mime_types = _extract_mime_types(registry) + all_mime_types.extend(mime_types) + + return all_mime_types + + +def _save_updated_mime_types(output_file, mime_types): + """Save the updated MIME Media types within the package.""" + with open(output_file, "w") as f: + json.dump(mime_types, f) + + +def update_mime_types(): + """ + Update MIME Media Types from IANA. Requires internet connection. + """ + log = setup_logger("UPDATE") + + log.debug("Getting XML data from IANA") + raw_data = download_url(IANA_URL) + log.debug("Data received; parsing...") + xml_data = _xml_to_dict(raw_data) + mime_types = _parse_xml_data(xml_data) + + current_dir = os.path.dirname(os.path.realpath(__file__)) + data_dir = os.path.join(current_dir, "data") + output_file = os.path.join(data_dir, "supported_mime_types.json") + + _save_updated_mime_types(output_file, mime_types) + + log.debug("Done! Supported IANA MIME media types have been updated.") + + +def _resource_type_lookup(assigned, root): + """ + Returns ``ResourceType`` object + + :param str assigned: The string name of the assigned resource type + :param root: RAML root object + """ + res_types = root.resource_types + if res_types: + res_type_obj = [r for r in res_types if r.name == assigned] + if res_type_obj: + return res_type_obj[0] + + +##### +# Helper methods +##### + +# general +def _get(data, item, default=None): + """ + Helper function to catch empty mappings in RAML. If item is optional + but not in the data, or data is ``None``, the default value is returned. + + :param data: RAML data + :param str item: RAML key + :param default: default value if item is not in dict + :param bool optional: If RAML item is optional or needs to be defined + :ret: value for RAML key + """ + try: + return data.get(item, default) + except AttributeError: + return default + + +def _create_base_param_obj(attribute_data, param_obj, config, errors, **kw): + """Helper function to create a BaseParameter object""" + objects = [] + + for key, value in list(iteritems(attribute_data)): + if param_obj is URIParameter: + required = _get(value, "required", default=True) + else: + required = _get(value, "required", default=False) + kwargs = dict( + name=key, + raw={key: value}, + desc=_get(value, "description"), + display_name=_get(value, "displayName", key), + min_length=_get(value, "minLength"), + max_length=_get(value, "maxLength"), + minimum=_get(value, "minimum"), + maximum=_get(value, "maximum"), + default=_get(value, "default"), + enum=_get(value, "enum"), + example=_get(value, "example"), + required=required, + repeat=_get(value, "repeat", False), + pattern=_get(value, "pattern"), + type=_get(value, "type", "string"), + config=config, + errors=errors + ) + if param_obj is Header: + kwargs["method"] = _get(kw, "method") + + item = param_obj(**kwargs) + objects.append(item) + + return objects or None + + +# used for traits & resource nodes +def _map_param_unparsed_str_obj(param): + return { + "queryParameters": QueryParameter, + "uriParameters": URIParameter, + "formParameters": FormParameter, + "baseUriParameters": URIParameter, + "headers": Header + }[param] + + +# create_resource_types +def _get_union(resource, method, inherited): + union = {} + for key, value in list(iteritems(inherited)): + if resource.get(method) is not None: + if key not in list(iterkeys(resource.get(method, {}))): + union[key] = value + else: + resource_values = resource.get(method, {}).get(key) + inherited_values = inherited.get(key, {}) + union[key] = dict(list(iteritems(resource_values)) + + list(iteritems(inherited_values))) + if resource.get(method) is not None: + for key, value in list(iteritems(resource.get(method, {}))): + if key not in list(iterkeys(inherited)): + union[key] = value + return union + + +def __is_scalar(item): + scalar_props = [ + "type", "enum", "pattern", "minLength", "maxLength", + "minimum", "maximum", "example", "repeat", "required", + "default", "description", "usage", "schema", "example", + "displayName" + ] + return item in scalar_props + + +def __get_sets(child, parent): + child_keys = [] + parent_keys = [] + if child: + child_keys = list(iterkeys(child)) + if parent: + parent_keys = list(iterkeys(parent)) + child_diff = list(set(child_keys) - set(parent_keys)) + parent_diff = list(set(parent_keys) - set(child_keys)) + intersection = list(set(child_keys).intersection(parent_keys)) + opt_inters = [i for i in child_keys if str(i) + "?" in parent_keys] + intersection = intersection + opt_inters + + return child, parent, child_diff, parent_diff, intersection + + +def _get_data_union(child, parent): + # takes child data and parent data and merges them + # with preference to child data overwriting parent data + # confession: had to look up set theory! + # FIXME: should bring this over from config, not hard code + methods = [ + 'get', 'post', 'put', 'delete', 'patch', 'head', 'options', + 'trace', 'connect', 'get?', 'post?', 'put?', 'delete?', 'patch?', + 'head?', 'options?', 'trace?', 'connect?' + ] + union = {} + child, parent, c_diff, p_diff, inters = __get_sets(child, parent) + + for i in c_diff: + union[i] = child.get(i) + for i in p_diff: + if i in methods and not i.endswith("?"): + union[i] = parent.get(i) + if i not in methods: + union[i] = parent.get(i) + for i in inters: + if __is_scalar(i): + union[i] = child.get(i) + else: + _child = child.get(i, {}) + _parent = parent.get(i, {}) + union[i] = _get_data_union(_child, _parent) + return union + + +def _get_inherited_resource(res_name, resource_types): + for resource in resource_types: + if res_name == list(iterkeys(resource))[0]: + return resource + + +def _get_res_type_attribute(res_data, method_data, item, default={}): + method_level = _get(method_data, item, default) + resource_level = _get(res_data, item, default) + return method_level, resource_level + + +def _get_inherited_type_params(data, attribute, params, resource_types): + inherited = _get_inherited_resource(data.get("type"), resource_types) + inherited = inherited.get(data.get("type")) + inherited_params = inherited.get(attribute, {}) + + return dict(list(iteritems(params)) + + list(iteritems(inherited_params))) + + +def _get_inherited_item(items, item_name, res_types, meth_, _data): + inherited = _get_inherited_resource(_data.get("type"), res_types) + resource = inherited.get(_data.get("type")) + res_level = resource.get(meth_, {}).get(item_name, {}) + + method_ = resource.get(meth_, {}) + method_level = method_.get(item_name, {}) + items = dict( + list(iteritems(items)) + + list(iteritems(res_level)) + + list(iteritems(method_level)) + ) + return items + + +def _get_attribute_dict(data, item, v): + resource_level = _get(v, item, {}) + method_level = _get(data, item, {}) + return dict(list(iteritems(resource_level)) + + list(iteritems(method_level))) + + +def _map_attr(attribute): + return { + "mediaType": "media_type", + "protocols": "protocols", + "headers": "headers", + "body": "body", + "responses": "responses", + "uriParameters": "uri_params", + "baseUriParameters": "base_uri_params", + "queryParameters": "query_params", + "formParameters": "form_params", + "description": "description" + }[attribute] + + +# create_node +def _get_method(attribute, method, raw_data): + """Returns ``attribute`` defined at the method level, or ``None``.""" + # if method is not None: # must explicitly say `not None` + # get_attribute = raw_data.get(method, {}) + # if get_attribute is not None: + # return get_attribute.get(attribute, {}) + # return {} + # if method is not None: + ret = _get(raw_data, method, {}) + ret = _get(ret, attribute, {}) + return ret + + +def _get_resource(attribute, raw_data): + """Returns ``attribute`` defined at the resource level, or ``None``.""" + return raw_data.get(attribute, {}) + + +def _get_parent(attribute, parent): + if parent: + return getattr(parent, attribute, {}) + return {} + + +# needs/uses parsed raml data +def _get_resource_type(attribute, root, type_, method): + """Returns ``attribute`` defined in the resource type, or ``None``.""" + if type_ and root.resource_types: + types = root.resource_types + r_type = [r for r in types if r.name == type_] + r_type = [r for r in r_type if r.method == method] + if r_type: + if hasattr(r_type[0], attribute): + if getattr(r_type[0], attribute) is not None: + return getattr(r_type[0], attribute) + return [] + + +def _get_trait(attribute, root, is_): + """Returns ``attribute`` defined in a trait, or ``None``.""" + + if is_: + traits = root.traits + if traits: + trait_objs = [] + for i in is_: + trait = [t for t in traits if t.name == i] + if trait: + if hasattr(trait[0], attribute): + if getattr(trait[0], attribute) is not None: + trait_objs.extend(getattr(trait[0], attribute)) + return trait_objs + return [] + + +def _get_scheme(item, root): + schemes = root.raw.get("securitySchemes", []) + for s in schemes: + if isinstance(item, str): + if item == list(iterkeys(s))[0]: + return s + elif isinstance(item, dict): + if list(iterkeys(item))[0] == list(iterkeys(s))[0]: + return s + + +def _get_attribute(attribute, method, raw_data): + method_level = _get_method(attribute, method, raw_data) + resource_level = _get_resource(attribute, raw_data) + return OrderedDict(list(iteritems(method_level)) + + list(iteritems(resource_level))) + + +def _get_inherited_attribute(attribute, root, type_, method, is_): + type_objects = _get_resource_type(attribute, root, type_, method) + trait_objects = _get_trait(attribute, root, is_) + return type_objects + trait_objects + + +# TODO: refactor - this ain't pretty +def _remove_duplicates(inherit_params, resource_params): + ret = [] + if isinstance(resource_params[0], Body): + _params = [p.mime_type for p in resource_params] + else: + _params = [p.name for p in resource_params] + + for p in inherit_params: + if isinstance(p, Body): + if p.mime_type not in _params: + ret.append(p) + else: + if p.name not in _params: + ret.append(p) + ret.extend(resource_params) + return ret or None + + +def _map_inheritance(nodetype): + return { + "traits": __trait, + "types": __resource_type, + "method": __method, + "resource": __resource, + "parent": __parent, + "root": __root + }[nodetype] + + +def __trait(item, **kwargs): + root = kwargs.get("root") + is_ = kwargs.get("is_") + return _get_trait(item, root, is_) + + +def __resource_type(item, **kwargs): + root = kwargs.get("root") + type_ = kwargs.get("type_") + method = kwargs.get("method") + item = _map_attr(item) + return _get_resource_type(item, root, type_, method) + + +def __method(item, **kwargs): + method = kwargs.get("method") + data = kwargs.get("data") + return _get_method(item, method, data) + + +def __resource(item, **kwargs): + data = kwargs.get("data") + return _get_resource(item, data) + + +def __parent(item, **kwargs): + parent = kwargs.get("parent") + return _get_parent(item, parent) + + +def __root(item, **kwargs): + root = kwargs.get("root") + item = _map_attr(item) + return getattr(root, item, None) + + +def get_inherited(item, inherit_from=[], **kwargs): + ret = {} + for nodetype in inherit_from: + inherit_func = _map_inheritance(nodetype) + inherited = inherit_func(item, **kwargs) + ret[nodetype] = inherited + return ret + + +##### +# set uri, form, query, header objects for traits +##### + +def set_param_object(param_data, param_str, root): + params = _get(param_data, param_str, {}) + param_obj = _map_param_unparsed_str_obj(param_str) + return _create_base_param_obj(params, + param_obj, + root.config, + root.errors) + + +##### +# set query, form, uri params for resource nodes +##### + +# <--[uri]--> +def __create_params(unparsed, parsed, method, raw_data, root, cls, type_, is_): + _params = _get_attribute(unparsed, method, raw_data) + param_objs = _get_inherited_attribute(parsed, root, type_, + method, is_) + params = _create_base_param_obj(_params, cls, root.config, root.errors) + return params, param_objs + + +def _create_uri_params(unparsed, parsed, root_params, root, type_, is_, + method, raw_data, parent): + params, param_objs = __create_params(unparsed, parsed, method, raw_data, + root, URIParameter, type_, is_) + + if params: + param_objs.extend(params) + if parent and parent.uri_params: + param_objs.extend(parent.uri_params) + if root_params: + param_names = [p.name for p in param_objs] + _params = [p for p in root_params if p.name not in param_names] + param_objs.extend(_params) + return param_objs or None +# <--[uri]--> + + +# <--[query, base uri, form]--> +def _check_already_exists(param, ret_list): + if isinstance(param, Body): + param_name_list = [p.mime_type for p in ret_list] + if param.mime_type not in param_name_list: + ret_list.append(param) + param_name_list.append(param.mime_type) + + else: + param_name_list = [p.name for p in ret_list] + if param.name not in param_name_list: + ret_list.append(param) + param_name_list.append(param.name) + return ret_list + + +# TODO: refactor - this ain't pretty +def __remove_duplicates(to_clean): + # order: resource, inherited, parent, root + ret = [] + + for param_set in to_clean: + if param_set: + for p in param_set: + ret = _check_already_exists(p, ret) + return ret or None + + +def _map_parsed_str(parsed): + name = parsed.split("_")[:-1] + name.append("parameters") + name = [n.capitalize() for n in name] + name = "".join(name) + return name[0].lower() + name[1:] + + +def set_params(data, param_str, root, method, inherit=False, **kw): + params, param_objs, parent_params, root_params = [], [], [], [] + + unparsed = _map_parsed_str(param_str) + param_obj = _map_param_unparsed_str_obj(unparsed) + _params = _get_attribute(unparsed, method, data) + + params = _create_base_param_obj(_params, param_obj, + root.config, root.errors) + if params is None: + params = [] + + # inherited objects + if inherit: + type_ = kw.get("type_") + is_ = kw.get("is_") + param_objs = _get_inherited_attribute(param_str, root, type_, + method, is_) + + # parent objects + parent = kw.get("parent") + if parent: + parent_params = getattr(parent, param_str, []) + + # root objects + root = kw.get("root_params") + if root: + param_names = [p.name for p in param_objs] + root_params = [p for p in root if p.name not in param_names] + + to_clean = (params, param_objs, parent_params, root_params) + + return __remove_duplicates(to_clean) +# <--[query, base uri, form]--> + + +# preserve order of URI and Base URI parameters +# used for RootNode, ResourceNode +def _preserve_uri_order(path, param_objs, config, errors, declared=[]): + # if this is hit, RAML shouldn't be valid anyways. + if isinstance(path, list): + path = path[0] + + sorted_params = [] + pattern = "\{(.*?)\}" + params = re.findall(pattern, path) + if not param_objs: + param_objs = [] + # if there are URI parameters in the path but were not declared + # inline, we should create them. + # TODO: Probably shouldn't do it in this function, though... + if len(params) > len(param_objs): + if len(param_objs) > 0: + param_names = [p.name for p in param_objs] + missing = [p for p in params if p not in param_names] + else: + missing = params[::] + # exclude any (base)uri params if already declared + missing = [p for p in missing if p not in declared] + for m in missing: + # no need to create a URI param for version + if m == "version": + continue + data = {"type": "string"} + _param = URIParameter(name=m, + raw={m: data}, + required=True, + display_name=m, + desc=_get(data, "description"), + min_length=_get(data, "minLength"), + max_length=_get(data, "maxLength"), + minimum=_get(data, "minimum"), + maximum=_get(data, "maximum"), + default=_get(data, "default"), + enum=_get(data, "enum"), + example=_get(data, "example"), + repeat=_get(data, "repeat", False), + pattern=_get(data, "pattern"), + type=_get(data, "type", "string"), + config=config, + errors=errors) + param_objs.append(_param) + for p in params: + _param = [i for i in param_objs if i.name == p] + if _param: + sorted_params.append(_param[0]) + return sorted_params or None diff --git a/tests/unit/utils/test_init.py b/tests/unit/utils/test_init.py index 6d27f00..46e5f2d 100644 --- a/tests/unit/utils/test_init.py +++ b/tests/unit/utils/test_init.py @@ -22,6 +22,9 @@ ##### +if sys.version_info[0] == 2: + from io import open + if sys.version_info[0] == 2: from io import open diff --git a/tox.ini b/tox.ini index e9f6d6a..78c6b3c 100644 --- a/tox.ini +++ b/tox.ini @@ -32,6 +32,14 @@ deps = commands = flake8 ramlfications tests --exclude=docs/ --ignore=E221,F405,W503,W504,F901 +# pep8 / flake8: Exclude the documentation files, +# and ignore E221 (multiple spaces before operator) +[pep8] +exclude = docs/ +ignore = E221 +[flake8] +exclude = docs/ +ignore = E221 [testenv:manifest] basepython = python3