Skip to content

Commit

Permalink
[wip] add input 'properties' modifier support (relates to #750)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault-crim committed Nov 4, 2024
1 parent cedb286 commit db63cf3
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 23 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ reports

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ testdata.json

## PyCharm
*.idea
*.run

## Intellij
*.iml
Expand Down
10 changes: 5 additions & 5 deletions weaver/processes/builtin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cwltool.command_line_tool import CommandLineTool
from cwltool.docker import DockerCommandLineJob
from cwltool.job import CommandLineJob, JobBase
from cwltool.singularity import SingularityCommandLineJob
#from cwltool.singularity import SingularityCommandLineJob

from weaver import WEAVER_ROOT_DIR
from weaver.compat import cache
Expand Down Expand Up @@ -236,8 +236,8 @@ class BuiltinProcessJobDocker(BuiltinProcessJobBase, DockerCommandLineJob):
pass


class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob):
pass
# class BuiltinProcessJobSingularity(BuiltinProcessJobBase, SingularityCommandLineJob):
# pass


# pylint: disable=W0221,W0237 # naming using python like arguments
Expand All @@ -247,6 +247,6 @@ def make_job_runner(self, runtime_context):
job = super(BuiltinProcess, self).make_job_runner(runtime_context)
if issubclass(job, DockerCommandLineJob):
return BuiltinProcessJobDocker
if issubclass(job, SingularityCommandLineJob):
return BuiltinProcessJobSingularity
# if issubclass(job, SingularityCommandLineJob):
# return BuiltinProcessJobSingularity
return BuiltinProcessJobBase
1 change: 0 additions & 1 deletion weaver/processes/builtin/collection_processor.cwl
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /usr/bin/env cwl-runner
# based on: https://github.com/opengeospatial/ogcapi-processes/blob/8c41db3f/core/examples/json/ProcessDescription.json
cwlVersion: v1.0
class: CommandLineTool
id: collection_processor
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/builtin/collection_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
def process_cwl(collection_input, input_definition, output_dir):
# type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap
files = process_collection(collection_input, input_definition, output_dir)
outputs = {"outputs": files} # 'outputs' must match ID used in CWL definition
outputs = {"referenceOutput": files} # output ID must match the one used in CWL definition
with open(os.path.join(output_dir, OUTPUT_CWL_JSON), mode="w", encoding="utf-8") as file:
json.dump(outputs, file)
return outputs
Expand Down
32 changes: 32 additions & 0 deletions weaver/processes/builtin/properties_processor.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#! /usr/bin/env cwl-runner
cwlVersion: v1.0
class: CommandLineTool
id: properties_processor
label: Properties Processor
doc: |
Generates properties contents using the specified input definitions.
# target the installed python pointing to weaver conda env to allow imports
baseCommand: ${WEAVER_ROOT_DIR}/bin/python
arguments: ["${WEAVER_ROOT_DIR}/weaver/processes/builtin/properties_processor.py", "-o", $(runtime.outdir)]
inputs:
properties:
doc: Properties definition submitted to the process and to be generated from input values.
type: File
format: "iana:application/json"
inputBinding:
prefix: -P
values:
doc: Values available for properties generation.
type: File
format: "iana:application/json"
inputBinding:
prefix: -V
outputs:
referenceOutput:
doc: Generated file contents from specified properties.
type: File
# note: important to omit 'format' here, since we want to preserve the flexibility to retrieve 'any' reference
outputBinding:
outputEval: $(runtime.outdir)/*
$namespaces:
iana: "https://www.iana.org/assignments/media-types/"
156 changes: 156 additions & 0 deletions weaver/processes/builtin/properties_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#!/usr/bin/env python
"""
Generates properties contents using the specified input definitions.
"""
import argparse
import ast
import json
import logging
import os
import sys
import uuid
from typing import TYPE_CHECKING

CUR_DIR = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, CUR_DIR)
# root to allow 'from weaver import <...>'
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(CUR_DIR))))

# place weaver specific imports after sys path fixing to ensure they are found from external call
# pylint: disable=C0413,wrong-import-order
from weaver.formats import ContentType, get_cwl_file_format # isort:skip # noqa: E402
from weaver.processes.builtin.utils import get_package_details # isort:skip # noqa: E402)
from weaver.utils import Lazify, load_file, repr_json, request_extra # isort:skip # noqa: E402

if TYPE_CHECKING:
from typing import Dict

from weaver.typedefs import (
CWL_IO_ValueMap,
JSON,
Path,
)
from weaver.utils import LoggerHandler

PACKAGE_NAME, PACKAGE_BASE, PACKAGE_MODULE = get_package_details(__file__)

# setup logger since it is not run from the main 'weaver' app
LOGGER = logging.getLogger(PACKAGE_MODULE)
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
LOGGER.setLevel(logging.INFO)

# process details
__version__ = "1.0"
__title__ = "Properties Processor"
__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative

OUTPUT_CWL_JSON = "cwl.output.json"


def compute_property(property_name, calculation, properties):
# type: (str, str, Dict[str, JSON]) -> None

... # FIXME: ast to do eval safely - TBD: what about property pointing at file?
calc = calculation.lower() # handle 'Min()'->'min()' - names allowed by "well-known functions"
result = ast.literal_eval(calc)
properties.update({property_name: result})


def process_properties(input_properties, input_values, output_dir, logger=LOGGER):
# type: (Dict[str, str], Dict[str, JSON], Path, LoggerHandler) -> JSON
"""
Processor of a ``properties`` definition for an input or output.
:param input_properties:
Properties definition submitted to the process and to be generated from input values.
:param input_values:
Values available for properties generation.
:param output_dir: Directory to write the output (provided by the :term:`CWL` definition).
:param logger: Optional logger handler to employ.
:return: File reference containing the resolved properties.
"""
logger.log( # pylint: disable=E1205 # false positive
logging.INFO,
"Process [{}] Got arguments: input_properties={}, input_values={} output_dir=[{}]",
PACKAGE_NAME,
Lazify(lambda: repr_json(input_properties, indent=2)),
Lazify(lambda: repr_json(input_values, indent=2)),
output_dir,
)
os.makedirs(output_dir, exist_ok=True)

# sort properties later if they depend on other ones, the least dependencies to be computed first
props_deps = {prop: 0 for prop in input_properties}
for prop, calc in input_properties.items():
for prop_dep in props_deps:
if prop == prop_dep:
if prop in calc:
raise ValueError(f"Invalid recursive property [{prop}] references itself.")
continue
if prop_dep in calc:
props_deps[prop_dep] += 1
if not filter(lambda dep: dep[-1] == 0, props_deps.items()):
raise ValueError("Invalid properties all depend on another one. Impossible resolution order.")
props = sorted(
list(input_properties.items()),
key=lambda p: props_deps[p[0]]
)

# compute the properties
properties = {}
for prop, calc in props:
compute_property(prop, calc, properties)

return properties


def process_cwl(input_properties, input_values, output_dir):
# type: (Dict[str, str], Dict[str, JSON], Path) -> CWL_IO_ValueMap
out_props = process_properties(input_properties, input_values, output_dir)
prop_file_path = os.path.join(output_dir, f"{uuid.uuid4()}.json")
with open(prop_file_path, mode="w", encoding="utf-8") as prop_file:
json.dump(out_props, prop_file, indent=2)
out_cwl_file = {
"class": "File",
"path": prop_file_path,
"format": get_cwl_file_format(ContentType.APP_JSON),
}
cwl_outputs = {"referenceOutput": out_cwl_file} # output ID must match the one used in CWL definition
cwl_out_path = os.path.join(output_dir, OUTPUT_CWL_JSON)
with open(cwl_out_path, mode="w", encoding="utf-8") as file:
json.dump(cwl_outputs, file)
return cwl_outputs


def main(*args):
# type: (*str) -> None
LOGGER.info("Process [%s] Parsing inputs...", PACKAGE_NAME)
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-P", "--properties",
metavar="INPUT_PROPERTIES",
required=True,
help="Properties definition submitted to the process and to be generated from input values.",
)
parser.add_argument(
"-V", "--values",
metavar="INPUT_VALUES",
required=True,
help="Values available for properties generation.",
)
parser.add_argument(
"-o", "--outdir",
metavar="OUTDIR",
required=True,
help="Output directory of the retrieved data.",
)
ns = parser.parse_args(*args)
LOGGER.info("Process [%s] Loading properties input from file '%s'.", PACKAGE_NAME, ns.properties)
prop_in = load_file(ns.properties)
LOGGER.info("Process [%s] Loading values input from file '%s'.", PACKAGE_NAME, ns.values)
val_in = load_file(ns.values)
sys.exit(process_cwl(prop_in, val_in, ns.outdir) is not None)


if __name__ == "__main__":
main()
27 changes: 23 additions & 4 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode
from weaver.processes import wps_package
from weaver.processes.builtin.collection_processor import process_collection
from weaver.processes.builtin.properties_processor import process_properties
from weaver.processes.constants import WPS_BOUNDINGBOX_DATA, WPS_COMPLEX_DATA, JobInputsOutputsSchema
from weaver.processes.convert import (
convert_input_values_schema,
Expand Down Expand Up @@ -595,16 +596,20 @@ def parse_wps_inputs(wps_process, job, container=None):
# this could refer to the desired collection ID rather than the input ID being mapped
input_info = dict(input_info) # not 'deepcopy' to avoid 'data' or 'value' copy that could be large
input_info["id"] = input_id

# collection reference
if isinstance(input_value, dict) and "collection" in input_value:
col_path = os.path.join(job.tmpdir, "inputs", input_id)
col_files = process_collection(input_value, input_info, col_path, logger=job)
resolved_inputs.extend([
resolved_input_values = [
(
{"href": col_file["path"], "type": map_cwl_media_type(col_file["format"])},
input_info
)
for col_file in col_files
])
]

# nested process reference
elif isinstance(input_value, dict) and "process" in input_value:
proc_uri = input_value["process"]
job_log_update_status_func(
Expand Down Expand Up @@ -632,9 +637,23 @@ def parse_wps_inputs(wps_process, job, container=None):
f"Abort execution. Cannot map multiple outputs from {proc_uri} "
f"to input [{input_id}] of [{job.process}]."
)
resolved_inputs.append((results[0], input_info))
resolved_input_values = [(results[0], input_info)]

# typical file/data
else:
resolved_inputs.append((input_value, input_info))
resolved_input_values = [(input_value, input_info)]

# post-handling of properties
properties = input_value.get("properties") if isinstance(input_value, dict) else None
if properties:
input_prop_path = os.path.join(job.tmpdir, "inputs", input_id)
input_prop_values = {input_id: resolved_input_values} # FIXME: handle other cross-input refs?
resolved_input_values = process_properties(
properties,
input_prop_values,
input_prop_path,
)
resolved_inputs.extend(resolved_input_values)

for input_value, input_info in resolved_inputs:
# if already resolved, skip parsing
Expand Down
33 changes: 21 additions & 12 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3953,26 +3953,35 @@ class ExecuteNestedProcessInput(ExtendedMappingSchema):
# 'process' is required for a nested definition, otherwise it will not even be detected as one!
process = ProcessURL(description="Process reference to be executed.")

@colander.deferred
@staticmethod
def get_field(field):
return getattr(ExecuteInputValues(), field).clone()

inputs = get_field

def deserialize(self, cstruct):
"""
Defer deserialization validation to the class that contains the set of expected properties.
Additional properties that are added dynamically should "align" to reflect the :term:`OpenAPI` definition,
although correspondance is not explicitly ensured.
"""
local_result = super().deserialize(cstruct)
defer_result = ExecuteParameters().deserialize(cstruct)
local_result.update(defer_result or {})
return local_result
self.bind()
return ExtendedMappingSchema.deserialize(self, cstruct)
# local_result = super().deserialize(cstruct)
# defer_result = ExecuteParameters().deserialize(cstruct)
# local_result.update(defer_result or {})
# return local_result

def convert_type(self, cstruct, dispatcher):
defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters())
local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self)
# local definitions take precedence to reflect alternate requirements
# defer the missing properties from the other schema (but only properties, to not override requirements)
defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()}
local_schema.update(defer_schema)
return local_schema
# def convert_type(self, cstruct, dispatcher):
# defer_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(ExecuteParameters())
# local_schema = ExtendedObjectTypeConverter(dispatcher).convert_type(self)
# # local definitions take precedence to reflect alternate requirements
# # defer the missing properties from the other schema (but only properties, to not override requirements)
# defer_schema = {field: schema for field, schema in defer_schema.items() if "properties" in field.lower()}
# local_schema.update(defer_schema)
# return local_schema


# Backward compatible data-input that allows values to be nested under 'data' or 'value' fields,
Expand Down

0 comments on commit db63cf3

Please sign in to comment.