Skip to content

Commit

Permalink
v0.6 add json-schema parser and python code generator
Browse files Browse the repository at this point in the history
  • Loading branch information
voidZXL committed Oct 21, 2024
1 parent bfb846b commit c3a84ef
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 25 deletions.
2 changes: 1 addition & 1 deletion utype/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
register_transformer = TypeTransformer.registry.register


VERSION = (0, 6, 0, 'alpha')
VERSION = (0, 6, 0)


def _get_version():
Expand Down
17 changes: 15 additions & 2 deletions utype/parser/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def __init__(self, rule_cls: Type["Rule"]):

@property
def origin_type(self):
return self.rule_cls.__origin__
return self.rule_cls._get_origin(self.rule_cls)

@origin_type.setter
def origin_type(self, t: type):
Expand Down Expand Up @@ -681,7 +681,20 @@ def valid_types(self, bounds: dict):
raise exc.ConfigError(
f"Constraint: {repr(key)} is only for type: {origin_types}, got bool"
)
if not issubclass(self.origin_type, origin_types):
origin = self.origin_type
origins = []
while True:
find = issubclass(origin, origin_types)
if find:
break
origin = getattr(origin, "__origin__", None)
if not origin:
break
if origin in origins:
break
origins.append(origin)

if not find:
raise exc.ConfigError(
f"Constraint: {repr(key)} is only for type: "
f"{origin_types}, got {self.origin_type}"
Expand Down
13 changes: 11 additions & 2 deletions utype/specs/json_schema/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,19 @@ def generate_for_field(self, f: ParserField, options: Options = None) -> Optiona
if aliases:
# sort to stay identical
aliases.sort()
data.update(aliases=aliases)
data.update({
'x-var-name': f.attname,
'x-aliases': aliases,
'aliases': aliases, # compat old version, will be deprecated
})

annotations = f.schema_annotations
if annotations:
data.update({
'x-annotation': annotations
})
return data

# todo: de-duplicate generated schema class like UserSchema['a']
def generate_for_dataclass(self, t):
# name = t.__qualname__
parser: ClassParser = getattr(t, '__parser__')
Expand All @@ -292,6 +296,7 @@ def generate_for_dataclass(self, t):
data = {"type": "object"}
required = []
properties = {}
dependent_required = {}
options = parser.options

if self.output:
Expand All @@ -304,6 +309,8 @@ def generate_for_dataclass(self, t):
if value is None:
continue
properties[name] = value
if field.dependencies:
dependent_required[name] = field.dependencies
if field.is_required(options or self.options):
# will count options.ignore_required in
required.append(name)
Expand All @@ -315,6 +322,8 @@ def generate_for_dataclass(self, t):
data.update(properties=properties)
if required:
data.update(required=required)
if dependent_required:
data.update(dependentRequired=dependent_required)
addition = options.addition
if addition is not None:
if isinstance(addition, type):
Expand Down
93 changes: 84 additions & 9 deletions utype/specs/json_schema/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Dict, Union, Tuple, Any, List, Type
from typing import Dict, Union, Tuple, Any, List, Type, Optional
from utype.utils.compat import ForwardRef
from utype.parser.rule import LogicalType, Rule
from utype.parser.field import Field
from utype.schema import LogicalMeta, Schema, DataClass
from utype.parser.options import Options
from utype.utils.datastructures import unprovided
from utype.utils.functional import valid_attr
from . import constant
import re
import keyword

_type = type

Expand All @@ -17,34 +20,59 @@ class JsonSchemaParser:
field_cls = Field
default_type = str

NON_NAME_REG = '[^A-Za-z0-9]+'

def __init__(self, json_schema: dict,
refs: Dict[str, type] = None,
refs: Dict[str, dict] = None,
name: str = None,
description: str = None,
# '#/components/...': SchemaClass
# names: Dict[str, type] = None,
ref_prefix: str = None, # '#/components/schemas'
def_prefix: str = None, # 'schemas'
type_map: dict = None,
force_forward_ref: bool = False,
):

if not isinstance(json_schema, dict):
raise TypeError(f'Invalid json schema: {json_schema}')
if force_forward_ref:
if refs is None:
raise ValueError('JsonSchemaParser force forward ref, but refs is None')
self.json_schema = json_schema
self.refs = refs
self.name = name
self.name = self.get_attname(name) if name else None
self.description = description
self.ref_prefix = (ref_prefix.rstrip('/') + '/') if ref_prefix else ''
self.def_prefix = (def_prefix.rstrip('.') + '.') if def_prefix else ''
self.force_forward_ref = force_forward_ref
_type_map = dict(constant.TYPE_MAP)
if type_map:
_type_map.update(type_map)
self.type_map = _type_map

def get_ref_object(self, ref: str) -> Optional[dict]:
if not self.refs:
return None
if ref.startswith(self.ref_prefix):
ref = ref[len(self.ref_prefix):]
ref_routes = ref.strip('/').split('/')
obj = self.refs
for route in ref_routes:
if not obj:
return None
obj = obj.get(route)
return None

def get_def_name(self, ref: str) -> str:
ref_name = ref.lstrip(self.ref_prefix)
if ref.startswith(self.ref_prefix):
ref = ref[len(self.ref_prefix):]
ref_name = self.get_attname(ref)
return self.def_prefix + ref_name

# def get_ref_name(self, name: str) -> str:
# return f'{self.ref_prefix.rstrip("/")}/{name.lstrip("/")}'

# def parse_type(self, schema: dict) -> type:
# return self.__class__(
# json_schema=schema,
Expand All @@ -62,30 +90,35 @@ def get_constraints(cls, schema: dict):
return constraints

def parse_field(self, schema: dict,
name: str = None,
field_cls: Type[Field] = None,
required: bool = None,
description: str = None,
dependencies: List[str] = None,
alias: str = None,
**kwargs,
) -> Tuple[type, Field]:
type = self.parse_type(schema, with_constraints=False)
type = self.parse_type(schema, name=name, with_constraints=False)
# annotations
default = schema.get('default', unprovided)
deprecated = schema.get('deprecated', False)
title = schema.get('title')
description = schema.get('description') or description
readonly = schema.get('readOnly')
writeonly = schema.get('writeOnly')
aliases = schema.get('x-aliases')
kwargs.update(self.get_constraints(schema))
kwargs.update(
alias=alias,
default=default,
deprecated=deprecated,
title=title,
description=description,
readonly=readonly,
writeonly=writeonly,
required=required,
dependencies=dependencies
dependencies=dependencies,
alias_from=aliases
)
field_cls = field_cls or self.field_cls
return type, field_cls(**kwargs)
Expand Down Expand Up @@ -165,6 +198,19 @@ def parse_type(self, schema: dict,
)
return t

@classmethod
def get_attname(cls, name: str, excludes: list = None):
name = re.sub(cls.NON_NAME_REG, '_', name).strip('_')
if keyword.iskeyword(name):
name += '_value'
if excludes:
i = 1
origin = name
while name in excludes:
name = f'{origin}_{i}'
i += 1
return name

def parse_object(self,
schema: dict,
name: str = None,
Expand Down Expand Up @@ -205,15 +251,35 @@ def parse_object(self,
)

for key, prop in properties.items():
prop = prop or {}
field_required = key in required if required else False
field_dependencies = dependent_required.get(key) if dependent_required else None
ref = prop.get('$ref')
if ref:
prop_schema = self.get_ref_object(ref) or {}
else:
prop_schema = prop
attname = prop_schema.get('x-var-name') or key
if not valid_attr(attname) or attname in attrs or hasattr(dict, attname):
attname = self.get_attname(attname, excludes=list(attrs))
alias = None
if attname != key:
alias = key
field_type, field = self.parse_field(
prop,
required=field_required,
dependencies=field_dependencies
dependencies=field_dependencies,
alias=alias
)
annotations[key] = field_type
attrs[key] = field
annotations[attname] = field_type
attrs[attname] = field

if self.force_forward_ref:
# return after parse all fields
# cause even if it's in force_forward_ref
# there may be schemas inside the schema field types
def_name = self.register_ref(name=name, schema=schema)
return ForwardRef(def_name)

attrs.update(
__annotations__=annotations,
Expand All @@ -224,6 +290,15 @@ def parse_object(self,
new_cls = self.object_meta_cls(name, (self.object_base_cls,), attrs)
return new_cls

def register_ref(self, name: str, schema: dict) -> str:
i = 1
cls_name = name
while name in self.refs:
name = f'{cls_name}_{i}'
i += 1
self.refs[name] = schema
return self.get_def_name(name)

def parse_array(self,
schema: dict,
name: str = None,
Expand Down
Loading

0 comments on commit c3a84ef

Please sign in to comment.