From cedb28656012d276e675a4bf5b016cea96ae34c9 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 28 Oct 2024 20:06:15 -0400 Subject: [PATCH] [wip] workaround conceptual nested-process recursive colander/cornice schema references --- weaver/wps_restapi/colander_extras.py | 26 +++++-- weaver/wps_restapi/swagger_definitions.py | 93 ++++++++++++++++++----- 2 files changed, 93 insertions(+), 26 deletions(-) diff --git a/weaver/wps_restapi/colander_extras.py b/weaver/wps_restapi/colander_extras.py index 2f85b2aac..cc61ecd82 100644 --- a/weaver/wps_restapi/colander_extras.py +++ b/weaver/wps_restapi/colander_extras.py @@ -1360,6 +1360,10 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): ``schema_include_deserialize``, ``schema_include_convert_type`` and ``schema_meta_include_convert_type`` can be used to control individually each schema inclusion during either the type conversion context (:term:`JSON` schema) or the deserialization context (:term:`JSON` data validation). + + Additionally, the ``_schema_extra`` attribute and the corresponding ``schema_extra`` initialization parameter can + be specified to inject further :term:`OpenAPI` schema definitions into the generated schema. Note that duplicate + properties specified by this extra definition will override any automatically generated schema properties. """ _extension = "_ext_schema_ref" _ext_schema_options = [ @@ -1370,8 +1374,9 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): "_schema_include", "_schema_include_deserialize", "_schema_include_convert_type", + "_schema_extra", ] - _ext_schema_fields = ["_id", "_schema"] + _ext_schema_fields = ["_id", "_schema", "_schema_extra"] # typings and attributes to help IDEs flag that the field is available/overridable @@ -1384,6 +1389,8 @@ class SchemaRefMappingSchema(ExtendedNodeInterface, ExtendedSchemaBase): _schema_include_deserialize = True # type: bool _schema_include_convert_type = True # type: bool + _schema_extra = None # type: Optional[OpenAPISchema] + def __init__(self, *args, **kwargs): for schema_key in self._schema_options: schema_field = schema_key[1:] @@ -1413,8 +1420,8 @@ def _schema_options(self): def _schema_fields(self): return getattr(self, "_ext_schema_fields", SchemaRefMappingSchema._ext_schema_fields) - def _schema_deserialize(self, cstruct, schema_meta, schema_id): - # type: (OpenAPISchema, Optional[str], Optional[str]) -> OpenAPISchema + def _schema_deserialize(self, cstruct, schema_meta=None, schema_id=None, schema_extra=None): + # type: (OpenAPISchema, Optional[str], Optional[str], Optional[OpenAPISchema]) -> OpenAPISchema """ Applies the relevant schema references and properties depending on :term:`JSON` schema/data conversion context. """ @@ -1439,6 +1446,7 @@ def _schema_deserialize(self, cstruct, schema_meta, schema_id): schema_result[schema_field] = schema.deserialize(cstruct.get(schema_field)) schema_result.update(cstruct) + schema_result.update(schema_extra or {}) return schema_result def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs @@ -1463,8 +1471,8 @@ def _deserialize_impl(self, cstruct): # pylint: disable=W0222,signature-differs return self._schema_deserialize(cstruct, schema_id, None) return cstruct - def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs - # type: (OpenAPISchema) -> OpenAPISchema + def convert_type(self, cstruct, dispatcher=None): # noqa # parameter to allow forwarding ref for override schemas + # type: (OpenAPISchema, Optional[TypeConversionDispatcher]) -> OpenAPISchema """ Converts the node to obtain the :term:`JSON` schema definition. """ @@ -1473,12 +1481,13 @@ def convert_type(self, cstruct): # pylint: disable=W0222,signature-differs schema_id_include_convert_type = getattr(self, "_schema_include_convert_type", False) schema_meta_include = getattr(self, "_schema_meta_include", False) schema_meta_include_convert_type = getattr(self, "_schema_meta_include_convert_type", False) + schema_extra = getattr(self, "_schema_extra", None) if schema_id_include and schema_id_include_convert_type: schema_id = getattr(self, "_schema", None) if schema_meta_include and schema_meta_include_convert_type: schema_meta = getattr(self, "_schema_meta", None) - if schema_id or schema_meta: - return self._schema_deserialize(cstruct, schema_meta, schema_id) + if schema_id or schema_meta or schema_extra: + return self._schema_deserialize(cstruct, schema_meta, schema_id, schema_extra) return cstruct @staticmethod @@ -2532,7 +2541,8 @@ def convert_type(self, schema_node): result = super(SchemaRefConverter, self).convert_type(schema_node) if isinstance(schema_node, SchemaRefMappingSchema): # apply any resolved schema references at the top of the definition - result_ref = SchemaRefMappingSchema.convert_type(schema_node, {}) + converter = getattr(type(schema_node), "convert_type", SchemaRefMappingSchema.convert_type) + result_ref = converter(schema_node, {}, dispatcher=self.dispatcher) result_ref.update(result) result = result_ref return result diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index e4975abff..f9c9d21ca 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -122,6 +122,7 @@ ExtendedFloat as Float, ExtendedInteger as Integer, ExtendedMappingSchema, + ExtendedObjectTypeConverter, ExtendedSchemaNode, ExtendedSequenceSchema, ExtendedString as String, @@ -3935,6 +3936,45 @@ class ExecuteCollectionInput(FilterSchema, SortBySchema, PermissiveMappingSchema ) +class ExecuteNestedProcessInput(ExtendedMappingSchema): + """ + Dynamically defines the nested process input. + + This class must create the nested properties dynamically because the required classes are not yet defined, and + those required definitions also depend on this class to define the nested process as a possible input value. + + .. note:: + This class acts as a :class:`colander.SchemaNode` and a `cornice.TypeConverter` simultaneously through + a dual interface invoked through :class:`weaver.wps_restapi.colander_extras.SchemaRefConverter`. + """ + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + description = "Nested process to execute, for which the selected output will become the input of the parent call." + + # 'process' is required for a nested definition, otherwise it will not even be detected as one! + process = ProcessURL(description="Process reference to be executed.") + + def deserialize(self, cstruct): + """ + Defer deserialization validation to the class that contains the set of expected properties. + + Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, + although correspondance is not explicitly ensured. + """ + local_result = super().deserialize(cstruct) + defer_result = ExecuteParameters().deserialize(cstruct) + local_result.update(defer_result or {}) + return local_result + + def convert_type(self, cstruct, dispatcher): + defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) + local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) + # local definitions take precedence to reflect alternate requirements + # defer the missing properties from the other schema (but only properties, to not override requirements) + defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} + local_schema.update(defer_schema) + return local_schema + + # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields, # both for literal values and link references, for inputs submitted as list-items. # Also allows the explicit 'href' (+ optional format) reference for a link. @@ -3962,6 +4002,8 @@ class ExecuteInputAnyType(OneOfKeywordSchema): ExecuteReference(), # HTTP reference to a 'collection' with optional processing arguments ExecuteCollectionInput(), + # Nested Process with its own inputs and outputs + ExecuteNestedProcessInput(), ] @@ -4149,7 +4191,7 @@ class ExecuteInputInlineOrRefData(OneOfKeywordSchema): ExecuteInputQualifiedValue(), # {"value": , "mediaType": "<>", "schema": } ExecuteInputFile(), # 'href' with either 'type' (OGC) or 'format' (OLD) ExecuteCollectionInput(), # 'collection' with optional processing operations - # FIXME: 'nested process' (https://github.com/crim-ca/weaver/issues/412) + ExecuteNestedProcessInput(), # 'process' with nested 'inputs', 'outputs', etc. ] @@ -4231,8 +4273,12 @@ class ExecuteInputOutputs(ExtendedMappingSchema): ) -class Execute(ExecuteInputOutputs): - # OGC 'execute.yaml' does not enforce any required item +class ExecuteParameters(ExecuteInputOutputs): + """ + Basic execution parameters that can be submitted to run a process. + + These parameters can be either for a top-level process job, or any nested process call. + """ _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" examples = { "ExecuteJSON": { @@ -4240,22 +4286,7 @@ class Execute(ExecuteInputOutputs): "value": EXAMPLES["job_execute.json"], }, } - process = ProcessURL( - missing=drop, - description=( - "Process reference to be executed. " - "This parameter is required if the process cannot be inferred from the request endpoint." - ), - ) title = JobTitle(missing=drop) - status = JobStatusCreate( - description=( - "Status to request creation of the job without submitting it to processing queue " - "and leave it pending until triggered by another results request to start it " - "(see *OGC API - Processes* - Part 4: Job Management)." - ), - missing=drop, - ) mode = JobExecuteModeEnum( missing=drop, default=ExecuteMode.AUTO, @@ -4275,6 +4306,32 @@ class Execute(ExecuteInputOutputs): subscribers = JobExecuteSubscribers(missing=drop) +class Execute(ExecuteParameters): + """ + Main execution parameters that can be submitted to run a process. + + Additional parameters are only applicable to the top-most process in a nested definition. + """ + # OGC 'execute.yaml' does not enforce any required item + description = "Process execution parameters." + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/execute.yaml" + process = ProcessURL( + missing=drop, + description=( + "Process reference to be executed. " + "This parameter is required if the process cannot be inferred from the request endpoint." + ), + ) + status = JobStatusCreate( + description=( + "Status to request creation of the job without submitting it to processing queue " + "and leave it pending until triggered by another results request to start it " + "(see *OGC API - Processes* - Part 4: Job Management)." + ), + missing=drop, + ) + + class QuoteStatusSchema(ExtendedSchemaNode): schema_type = String validator = OneOf(QuoteStatus.values())