From db63cf3db953af119f2c85adf509003f87823776 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 4 Nov 2024 10:37:25 -0500 Subject: [PATCH] [wip] add input 'properties' modifier support (relates to https://github.com/crim-ca/weaver/issues/750) --- .dockerignore | 1 + .gitignore | 1 + weaver/processes/builtin/__init__.py | 10 +- .../builtin/collection_processor.cwl | 1 - .../processes/builtin/collection_processor.py | 2 +- .../builtin/properties_processor.cwl | 32 ++++ .../processes/builtin/properties_processor.py | 156 ++++++++++++++++++ weaver/processes/execution.py | 27 ++- weaver/wps_restapi/swagger_definitions.py | 33 ++-- 9 files changed, 240 insertions(+), 23 deletions(-) create mode 100644 weaver/processes/builtin/properties_processor.cwl create mode 100644 weaver/processes/builtin/properties_processor.py diff --git a/.dockerignore b/.dockerignore index 84f3d026b..c01312ef7 100644 --- a/.dockerignore +++ b/.dockerignore @@ -61,6 +61,7 @@ reports ## PyCharm *.idea +*.run ## Intellij *.iml diff --git a/.gitignore b/.gitignore index b92754eec..a8a853161 100644 --- a/.gitignore +++ b/.gitignore @@ -63,6 +63,7 @@ testdata.json ## PyCharm *.idea +*.run ## Intellij *.iml diff --git a/weaver/processes/builtin/__init__.py b/weaver/processes/builtin/__init__.py index b7d9440f3..042ef802b 100644 --- a/weaver/processes/builtin/__init__.py +++ b/weaver/processes/builtin/__init__.py @@ -9,7 +9,7 @@ from cwltool.command_line_tool import CommandLineTool from cwltool.docker import DockerCommandLineJob from cwltool.job import CommandLineJob, JobBase -from cwltool.singularity import SingularityCommandLineJob +#from cwltool.singularity import SingularityCommandLineJob from weaver import WEAVER_ROOT_DIR from weaver.compat import cache @@ -236,8 +236,8 @@ class BuiltinProcessJobDocker(BuiltinProcessJobBase, DockerCommandLineJob): pass -class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): - pass +# class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob): +# pass # pylint: disable=W0221,W0237 # naming using python like arguments @@ -247,6 +247,6 @@ def make_job_runner(self, runtime_context): job = super(BuiltinProcess, self).make_job_runner(runtime_context) if issubclass(job, DockerCommandLineJob): return BuiltinProcessJobDocker - if issubclass(job, SingularityCommandLineJob): - return BuiltinProcessJobSingularity + # if issubclass(job, SingularityCommandLineJob): + # return BuiltinProcessJobSingularity return BuiltinProcessJobBase diff --git a/weaver/processes/builtin/collection_processor.cwl b/weaver/processes/builtin/collection_processor.cwl index a5782f7be..805671b6c 100644 --- a/weaver/processes/builtin/collection_processor.cwl +++ b/weaver/processes/builtin/collection_processor.cwl @@ -1,5 +1,4 @@ #! /usr/bin/env cwl-runner -# based on: https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/core/examples/json/ProcessDescription.json cwlVersion: v1.0 class: CommandLineTool id: collection_processor diff --git a/weaver/processes/builtin/collection_processor.py b/weaver/processes/builtin/collection_processor.py index f87783705..3fe2d3d8b 100644 --- a/weaver/processes/builtin/collection_processor.py +++ b/weaver/processes/builtin/collection_processor.py @@ -266,7 +266,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO def process_cwl(collection_input, input_definition, output_dir): # type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap files = process_collection(collection_input, input_definition, output_dir) - outputs = {"outputs": files} # 'outputs' must match ID used in CWL definition + outputs = {"referenceOutput": files} # output ID must match the one used in CWL definition with open(os.path.join(output_dir, OUTPUT_CWL_JSON), mode="w", encoding="utf-8") as file: json.dump(outputs, file) return outputs diff --git a/weaver/processes/builtin/properties_processor.cwl b/weaver/processes/builtin/properties_processor.cwl new file mode 100644 index 000000000..30d7a715b --- /dev/null +++ b/weaver/processes/builtin/properties_processor.cwl @@ -0,0 +1,32 @@ +#! /usr/bin/env cwl-runner +cwlVersion: v1.0 +class: CommandLineTool +id: properties_processor +label: Properties Processor +doc: | + Generates properties contents using the specified input definitions. +# target the installed python pointing to weaver conda env to allow imports +baseCommand: ${WEAVER_ROOT_DIR}/bin/python +arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)] +inputs: + properties: + doc: Properties definition submitted to the process and to be generated from input values. + type: File + format: "iana:application/json" + inputBinding: + prefix: -P + values: + doc: Values available for properties generation. + type: File + format: "iana:application/json" + inputBinding: + prefix: -V +outputs: + referenceOutput: + doc: Generated file contents from specified properties. + type: File + # note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference + outputBinding: + outputEval: $(runtime.outdir)/* +$namespaces: + iana: "https://www.iana.org/assignments/media-types/" diff --git a/weaver/processes/builtin/properties_processor.py b/weaver/processes/builtin/properties_processor.py new file mode 100644 index 000000000..71456f498 --- /dev/null +++ b/weaver/processes/builtin/properties_processor.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +""" +Generates properties contents using the specified input definitions. +""" +import argparse +import ast +import json +import logging +import os +import sys +import uuid +from typing import TYPE_CHECKING + +CUR_DIR = os.path.abspath(os.path.dirname(__file__)) +sys.path.insert(0, CUR_DIR) +# root to allow 'from weaver import <...>' +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR)))) + +# place weaver specific imports after sys path fixing to ensure they are found from external call +# pylint: disable=C0413,wrong-import-order +from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402 +from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402) +from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402 + +if TYPE_CHECKING: + from typing import Dict + + from weaver.typedefs import ( + CWL_IO_ValueMap, + JSON, + Path, + ) + from weaver.utils import LoggerHandler + +PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__) + +# setup logger since it is not run from the main 'weaver' app +LOGGER = logging.getLogger(PACKAGE_MODULE) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) +LOGGER.setLevel(logging.INFO) + +# process details +__version__ = "1.0" +__title__ = "Properties Processor" +__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative + +OUTPUT_CWL_JSON = "cwl.output.json" + + +def compute_property(property_name, calculation, properties): + # type: (str, str, Dict[str, JSON]) -> None + + ... # FIXME: ast to do eval safely - TBD: what about property pointing at file? + calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions" + result = ast.literal_eval(calc) + properties.update({property_name: result}) + + +def process_properties(input_properties, input_values, output_dir, logger=LOGGER): + # type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON + """ + Processor of a ``properties`` definition for an input or output. + + :param input_properties: + Properties definition submitted to the process and to be generated from input values. + :param input_values: + Values available for properties generation. + :param output_dir: Directory to write the output (provided by the :term:`CWL` definition). + :param logger: Optional logger handler to employ. + :return: File reference containing the resolved properties. + """ + logger.log( # pylint: disable=E1205 # false positive + logging.INFO, + "Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]", + PACKAGE_NAME, + Lazify(lambda: repr_json(input_properties, indent=2)), + Lazify(lambda: repr_json(input_values, indent=2)), + output_dir, + ) + os.makedirs(output_dir, exist_ok=True) + + # sort properties later if they depend on other ones, the least dependencies to be computed first + props_deps = {prop: 0 for prop in input_properties} + for prop, calc in input_properties.items(): + for prop_dep in props_deps: + if prop == prop_dep: + if prop in calc: + raise ValueError(f"Invalid recursive property [{prop}] references itself.") + continue + if prop_dep in calc: + props_deps[prop_dep] += 1 + if not filter(lambda dep: dep[-1] == 0, props_deps.items()): + raise ValueError("Invalid properties all depend on another one. Impossible resolution order.") + props = sorted( + list(input_properties.items()), + key=lambda p: props_deps[p[0]] + ) + + # compute the properties + properties = {} + for prop, calc in props: + compute_property(prop, calc, properties) + + return properties + + +def process_cwl(input_properties, input_values, output_dir): + # type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap + out_props = process_properties(input_properties, input_values, output_dir) + prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json") + with open(prop_file_path, mode="w", encoding="utf-8") as prop_file: + json.dump(out_props, prop_file, indent=2) + out_cwl_file = { + "class": "File", + "path": prop_file_path, + "format": get_cwl_file_format(ContentType.APP_JSON), + } + cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition + cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON) + with open(cwl_out_path, mode="w", encoding="utf-8") as file: + json.dump(cwl_outputs, file) + return cwl_outputs + + +def main(*args): + # type: (*str) -> None + LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME) + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-P", "--properties", + metavar="INPUT_PROPERTIES", + required=True, + help="Properties definition submitted to the process and to be generated from input values.", + ) + parser.add_argument( + "-V", "--values", + metavar="INPUT_VALUES", + required=True, + help="Values available for properties generation.", + ) + parser.add_argument( + "-o", "--outdir", + metavar="OUTDIR", + required=True, + help="Output directory of the retrieved data.", + ) + ns = parser.parse_args(*args) + LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties) + prop_in = load_file(ns.properties) + LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values) + val_in = load_file(ns.values) + sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None) + + +if __name__ == "__main__": + main() diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index ecdb0a1c4..0b8cb2fe3 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -38,6 +38,7 @@ from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.builtin.collection_processor import process_collection +from weaver.processes.builtin.properties_processor import process_properties from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema from weaver.processes.convert import ( convert_input_values_schema, @@ -595,16 +596,20 @@ def parse_wps_inputs(wps_process, job, container=None): # this could refer to the desired collection ID rather than the input ID being mapped input_info = dict(input_info) # not 'deepcopy' to avoid 'data' or 'value' copy that could be large input_info["id"] = input_id + + # collection reference if isinstance(input_value, dict) and "collection" in input_value: col_path = os.path.join(job.tmpdir, "inputs", input_id) col_files = process_collection(input_value, input_info, col_path, logger=job) - resolved_inputs.extend([ + resolved_input_values = [ ( {"href": col_file["path"], "type": map_cwl_media_type(col_file["format"])}, input_info ) for col_file in col_files - ]) + ] + + # nested process reference elif isinstance(input_value, dict) and "process" in input_value: proc_uri = input_value["process"] job_log_update_status_func( @@ -632,9 +637,23 @@ def parse_wps_inputs(wps_process, job, container=None): f"Abort execution. Cannot map multiple outputs from {proc_uri} " f"to input [{input_id}] of [{job.process}]." ) - resolved_inputs.append((results[0], input_info)) + resolved_input_values = [(results[0], input_info)] + + # typical file/data else: - resolved_inputs.append((input_value, input_info)) + resolved_input_values = [(input_value, input_info)] + + # post-handling of properties + properties = input_value.get("properties") if isinstance(input_value, dict) else None + if properties: + input_prop_path = os.path.join(job.tmpdir, "inputs", input_id) + input_prop_values = {input_id: resolved_input_values} # FIXME: handle other cross-input refs? + resolved_input_values = process_properties( + properties, + input_prop_values, + input_prop_path, + ) + resolved_inputs.extend(resolved_input_values) for input_value, input_info in resolved_inputs: # if already resolved, skip parsing diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index f9c9d21ca..8979edce6 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -3953,6 +3953,13 @@ class ExecuteNestedProcessInput(ExtendedMappingSchema): # 'process' is required for a nested definition, otherwise it will not even be detected as one! process = ProcessURL(description="Process reference to be executed.") + @colander.deferred + @staticmethod + def get_field(field): + return getattr(ExecuteInputValues(), field).clone() + + inputs = get_field + def deserialize(self, cstruct): """ Defer deserialization validation to the class that contains the set of expected properties. @@ -3960,19 +3967,21 @@ def deserialize(self, cstruct): Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition, although correspondance is not explicitly ensured. """ - local_result = super().deserialize(cstruct) - defer_result = ExecuteParameters().deserialize(cstruct) - local_result.update(defer_result or {}) - return local_result + self.bind() + return ExtendedMappingSchema.deserialize(self, cstruct) + # local_result = super().deserialize(cstruct) + # defer_result = ExecuteParameters().deserialize(cstruct) + # local_result.update(defer_result or {}) + # return local_result - def convert_type(self, cstruct, dispatcher): - defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) - local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) - # local definitions take precedence to reflect alternate requirements - # defer the missing properties from the other schema (but only properties, to not override requirements) - defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} - local_schema.update(defer_schema) - return local_schema + # def convert_type(self, cstruct, dispatcher): + # defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters()) + # local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self) + # # local definitions take precedence to reflect alternate requirements + # # defer the missing properties from the other schema (but only properties, to not override requirements) + # defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()} + # local_schema.update(defer_schema) + # return local_schema # Backward compatible data-input that allows values to be nested under 'data' or 'value' fields,