Skip to content

Commit

Permalink
[wip] workaround conceptual nested-process recursive colander/cornice…
Browse files Browse the repository at this point in the history
… schema references
  • Loading branch information
fmigneault committed Oct 29, 2024
1 parent 4c2c023 commit cedb286
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 26 deletions.
26 changes: 18 additions & 8 deletions weaver/wps_restapi/colander_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,10 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase):
``schema_include_deserialize``, ``schema_include_convert_type`` and ``schema_meta_include_convert_type`` can be
used to control individually each schema inclusion during either the type conversion context (:term:`JSON` schema)
or the deserialization context (:term:`JSON` data validation).
Additionally, the ``_schema_extra`` attribute and the corresponding ``schema_extra`` initialization parameter can
be specified to inject further :term:`OpenAPI` schema definitions into the generated schema. Note that duplicate
properties specified by this extra definition will override any automatically generated schema properties.
"""
_extension = "_ext_schema_ref"
_ext_schema_options = [
Expand All @@ -1370,8 +1374,9 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase):
"_schema_include",
"_schema_include_deserialize",
"_schema_include_convert_type",
"_schema_extra",
]
_ext_schema_fields = ["_id", "_schema"]
_ext_schema_fields = ["_id", "_schema", "_schema_extra"]

# typings and attributes to help IDEs flag that the field is available/overridable

Expand All @@ -1384,6 +1389,8 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase):
_schema_include_deserialize = True # type: bool
_schema_include_convert_type = True # type: bool

_schema_extra = None # type: Optional[OpenAPISchema]

def __init__(self, *args, **kwargs):
for schema_key in self._schema_options:
schema_field = schema_key[1:]
Expand Down Expand Up @@ -1413,8 +1420,8 @@ def _schema_options(self):
def _schema_fields(self):
return getattr(self, "_ext_schema_fields", SchemaRefMappingSchema._ext_schema_fields)

def _schema_deserialize(self, cstruct, schema_meta, schema_id):
# type: (OpenAPISchema, Optional[str], Optional[str]) -> OpenAPISchema
def _schema_deserialize(self, cstruct, schema_meta=None, schema_id=None, schema_extra=None):
# type: (OpenAPISchema, Optional[str], Optional[str], Optional[OpenAPISchema]) -> OpenAPISchema
"""
Applies the relevant schema references and properties depending on :term:`JSON` schema/data conversion context.
"""
Expand All @@ -1439,6 +1446,7 @@ def _schema_deserialize(self, cstruct, schema_meta, schema_id):
schema_result[schema_field] = schema.deserialize(cstruct.get(schema_field))

schema_result.update(cstruct)
schema_result.update(schema_extra or {})
return schema_result

def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs
Expand All @@ -1463,8 +1471,8 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs
return self._schema_deserialize(cstruct, schema_id, None)
return cstruct

def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs
# type: (OpenAPISchema) -> OpenAPISchema
def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas
# type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema
"""
Converts the node to obtain the :term:`JSON` schema definition.
"""
Expand All @@ -1473,12 +1481,13 @@ def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs
schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False)
schema_meta_include = getattr(self, "_schema_meta_include", False)
schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False)
schema_extra = getattr(self, "_schema_extra", None)
if schema_id_include and schema_id_include_convert_type:
schema_id = getattr(self, "_schema", None)
if schema_meta_include and schema_meta_include_convert_type:
schema_meta = getattr(self, "_schema_meta", None)
if schema_id or schema_meta:
return self._schema_deserialize(cstruct, schema_meta, schema_id)
if schema_id or schema_meta or schema_extra:
return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra)
return cstruct

@staticmethod
Expand Down Expand Up @@ -2532,7 +2541,8 @@ def convert_type(self, schema_node):
result = super(SchemaRefConverter, self).convert_type(schema_node)
if isinstance(schema_node, SchemaRefMappingSchema):
# apply any resolved schema references at the top of the definition
result_ref = SchemaRefMappingSchema.convert_type(schema_node, {})
converter = getattr(type(schema_node), "convert_type", SchemaRefMappingSchema.convert_type)
result_ref = converter(schema_node, {}, dispatcher=self.dispatcher)
result_ref.update(result)
result = result_ref
return result
Expand Down
93 changes: 75 additions & 18 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
ExtendedFloat as Float,
ExtendedInteger as Integer,
ExtendedMappingSchema,
ExtendedObjectTypeConverter,
ExtendedSchemaNode,
ExtendedSequenceSchema,
ExtendedString as String,
Expand Down Expand Up @@ -3935,6 +3936,45 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema
)


class ExecuteNestedProcessInput(ExtendedMappingSchema):
"""
Dynamically defines the nested process input.
This class must create the nested properties dynamically because the required classes are not yet defined, and
those required definitions also depend on this class to define the nested process as a possible input value.
.. note::
This class acts as a :class:`colander.SchemaNode` and a `cornice.TypeConverter` simultaneously through
a dual interface invoked through :class:`weaver.wps_restapi.colander_extras.SchemaRefConverter`.
"""
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
description = "Nested process to execute, for which the selected output will become the input of the parent call."

# 'process' is required for a nested definition, otherwise it will not even be detected as one!
process = ProcessURL(description="Process reference to be executed.")

def deserialize(self, cstruct):
"""
Defer deserialization validation to the class that contains the set of expected properties.
Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition,
although correspondance is not explicitly ensured.
"""
local_result = super().deserialize(cstruct)
defer_result = ExecuteParameters().deserialize(cstruct)
local_result.update(defer_result or {})
return local_result

def convert_type(self, cstruct, dispatcher):
defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters())
local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self)
# local definitions take precedence to reflect alternate requirements
# defer the missing properties from the other schema (but only properties, to not override requirements)
defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()}
local_schema.update(defer_schema)
return local_schema


# Backward compatible data-input that allows values to be nested under 'data' or 'value' fields,
# both for literal values and link references, for inputs submitted as list-items.
# Also allows the explicit 'href' (+ optional format) reference for a link.
Expand Down Expand Up @@ -3962,6 +4002,8 @@ class ExecuteInputAnyType(OneOfKeywordSchema):
ExecuteReference(),
# HTTP reference to a 'collection' with optional processing arguments
ExecuteCollectionInput(),
# Nested Process with its own inputs and outputs
ExecuteNestedProcessInput(),
]


Expand Down Expand Up @@ -4149,7 +4191,7 @@ class ExecuteInputInlineOrRefData(OneOfKeywordSchema):
ExecuteInputQualifiedValue(), # {"value": <anything>, "mediaType": "<>", "schema": <OAS link or object>}
ExecuteInputFile(), # 'href' with either 'type' (OGC) or 'format' (OLD)
ExecuteCollectionInput(), # 'collection' with optional processing operations
# FIXME: 'nested process' (https://github.com/crim-ca/weaver/issues/412)
ExecuteNestedProcessInput(), # 'process' with nested 'inputs', 'outputs', etc.
]


Expand Down Expand Up @@ -4231,31 +4273,20 @@ class ExecuteInputOutputs(ExtendedMappingSchema):
)


class Execute(ExecuteInputOutputs):
# OGC 'execute.yaml' does not enforce any required item
class ExecuteParameters(ExecuteInputOutputs):
"""
Basic execution parameters that can be submitted to run a process.
These parameters can be either for a top-level process job, or any nested process call.
"""
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
examples = {
"ExecuteJSON": {
"summary": "Execute a process job using REST JSON payload with OGC API schema.",
"value": EXAMPLES["job_execute.json"],
},
}
process = ProcessURL(
missing=drop,
description=(
"Process reference to be executed. "
"This parameter is required if the process cannot be inferred from the request endpoint."
),
)
title = JobTitle(missing=drop)
status = JobStatusCreate(
description=(
"Status to request creation of the job without submitting it to processing queue "
"and leave it pending until triggered by another results request to start it "
"(see *OGC API - Processes* - Part 4: Job Management)."
),
missing=drop,
)
mode = JobExecuteModeEnum(
missing=drop,
default=ExecuteMode.AUTO,
Expand All @@ -4275,6 +4306,32 @@ class Execute(ExecuteInputOutputs):
subscribers = JobExecuteSubscribers(missing=drop)


class Execute(ExecuteParameters):
"""
Main execution parameters that can be submitted to run a process.
Additional parameters are only applicable to the top-most process in a nested definition.
"""
# OGC 'execute.yaml' does not enforce any required item
description = "Process execution parameters."
_schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml"
process = ProcessURL(
missing=drop,
description=(
"Process reference to be executed. "
"This parameter is required if the process cannot be inferred from the request endpoint."
),
)
status = JobStatusCreate(
description=(
"Status to request creation of the job without submitting it to processing queue "
"and leave it pending until triggered by another results request to start it "
"(see *OGC API - Processes* - Part 4: Job Management)."
),
missing=drop,
)


class QuoteStatusSchema(ExtendedSchemaNode):
schema_type = String
validator = OneOf(QuoteStatus.values())
Expand Down

0 comments on commit cedb286

Please sign in to comment.