Skip to content

Commit

Permalink
support Job type and processID filter queries (relates to #268) + pro…
Browse files Browse the repository at this point in the history
…vide Job status type field (resolves #351) + increate OGC conformance (relates to #53, #231)
  • Loading branch information
fmigneault committed Nov 4, 2021
1 parent c7990fc commit 87c421b
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 40 deletions.
8 changes: 6 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ Changes

Changes:
--------
- No change.
- Add support of ``type`` and ``processID`` query parameters for ``Job`` listing
(resolves some tasks in `#268 <https://github.com/crim-ca/weaver/issues/268>`_).
- Add ``type`` field to ``Job`` status information
(resolves `#351 <https://github.com/crim-ca/weaver/issues/351>`_).
- Add `OGC-API - Processes` conformance references regarding supported operations for ``Job`` listing and filtering.

Fixes:
------
- No change.
- Allow ``group`` query parameter to handle ``Job`` category listing with ``provider`` as ``service`` alias.

`4.2.1 <https://github.com/crim-ca/weaver/tree/4.2.1>`_ (2021-10-20)
========================================================================
Expand Down
3 changes: 3 additions & 0 deletions docs/source/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
.. |submit-issue| replace:: submit a new issue
.. _submit-issue: https://github.com/crim-ca/weaver/issues/new/choose

.. OGC-API references
.. |ogc-statusInfo| replace:: https://raw.githubusercontent.com/opengeospatial/ogcapi-processes/master/core/openapi/schemas/statusInfo.yaml

.. Example references
.. |examples| replace:: Examples
.. _examples: examples.rst
Expand Down
173 changes: 152 additions & 21 deletions tests/wps_restapi/test_jobs.py

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,20 @@ def process(self, process):
raise TypeError("Type 'str' is required for '{}.process'".format(type(self)))
self["process"] = process

@property
def type(self):
# type: () -> str
"""
Obtain the type of the element associated to the creation of this job.
.. seealso::
- Defined in |ogc-statusInfo|
- Queried with https://docs.ogc.org/DRAFTS/18-062.html#_parameter_type
"""
if self.service is None:
return "process"
return "provider"

def _get_inputs(self):
# type: () -> List[Optional[Dict[str, Any]]]
if self.get("inputs") is None:
Expand Down Expand Up @@ -1024,6 +1038,7 @@ def json(self, container=None, self_link=None): # pylint: disable=W0221,argu
"jobID": self.id,
"processID": self.process,
"providerID": self.service,
"type": self.type,
"status": map_status(self.status),
"message": self.status_message,
"created": self.created,
Expand Down
39 changes: 34 additions & 5 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
from pymongo.collection import Collection

from weaver.store.base import DatetimeIntervalType, JobCategoriesAndCount, JobListAndCount
from weaver.typedefs import AnyProcess, AnyProcessType
from weaver.typedefs import AnyProcess, AnyProcessType, AnyValue

AnyValueMongo = Union[AnyValue, datetime.datetime]
SearchFilterMongo = Union[AnyValueMongo, Dict[str, Union[AnyValueMongo, List[AnyValueMongo]]]]

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -542,6 +545,7 @@ def list_jobs(self):
def find_jobs(self,
process=None, # type: Optional[str]
service=None, # type: Optional[str]
type=None, # type: Optional[str]
tags=None, # type: Optional[List[str]]
access=None, # type: Optional[str]
notification_email=None, # type: Optional[str]
Expand Down Expand Up @@ -583,6 +587,7 @@ def find_jobs(self,
:param request: request that lead to this call to obtain permissions and user id.
:param process: process name to filter matching jobs.
:param service: service name to filter matching jobs.
:param type: filter matching jobs for given type.
:param tags: list of tags to filter matching jobs.
:param access: access visibility to filter matching jobs (default: :py:data:`VISIBILITY_PUBLIC`).
:param notification_email: notification email to filter matching jobs.
Expand All @@ -603,7 +608,7 @@ def find_jobs(self,
"locator": "tags",
})

search_filters = {}
search_filters = {} # type: Dict[str, SearchFilterMongo]

if not request:
search_filters.setdefault("access", VISIBILITY_PUBLIC)
Expand All @@ -623,17 +628,32 @@ def find_jobs(self,
search_filters["tags"] = {"$all": tags}

if status in JOB_STATUS_CATEGORIES:
search_filters["status"] = {"$in": JOB_STATUS_CATEGORIES[status]}
category_statuses = list(JOB_STATUS_CATEGORIES[status])
search_filters["status"] = {"$in": category_statuses}
elif status:
search_filters["status"] = status

if notification_email is not None:
search_filters["notification_email"] = notification_email

if type == "process":
search_filters["service"] = None
elif type == "provider":
search_filters["service"] = {"$ne": None}

if process is not None:
# if (type=provider and process=<id>)
# doesn't contradict since it can be more specific about sub-process of service
search_filters["process"] = process

if service is not None:
# can override 'service' set by 'type' to be more specific, but must be logical
# (e.g.: type=process and service=<name> cannot ever yield anything)
if search_filters.get("service", -1) is None:
raise JobInvalidParameter(json={
"description": "Ambiguous type requested contradicts with requested service provider.",
"cause": {"service": service, "type": type}
})
search_filters["service"] = service

if datetime is not None:
Expand All @@ -656,8 +676,9 @@ def find_jobs(self,
sort = "user_id"
if sort not in JOB_SORT_VALUES:
raise JobInvalidParameter(json={
"description": "Invalid sorting method: '{}'".format(repr(sort)),
"locator": "sort"
"description": "Invalid sorting method.",
"cause": "sort",
"value": str(sort),
})
sort_order = DESCENDING if sort in (SORT_FINISHED, SORT_CREATED) else ASCENDING
sort_criteria = {sort: sort_order}
Expand All @@ -668,6 +689,10 @@ def find_jobs(self,
# results by group categories
if group_by:
group_by = [group_by] if isinstance(group_by, str) else group_by # type: List[str]
has_provider = "provider" in group_by
if has_provider:
group_by.remove("provider")
group_by.append("service")
group_categories = {field: "$" + field for field in group_by} # fields that can generate groups
pipeline.extend([{
"$group": {
Expand All @@ -685,6 +710,10 @@ def find_jobs(self,
found = self.collection.aggregate(pipeline)
items = [{k: (v if k != "jobs" else [Job(j) for j in v]) # convert to Job object where applicable
for k, v in i.items()} for i in found]
if has_provider:
for group_result in items:
group_service = group_result["category"].pop("service", None)
group_result["category"]["provider"] = group_service

# results with paging
else:
Expand Down
7 changes: 7 additions & 0 deletions weaver/wps_restapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,13 @@ def api_conformance(request): # noqa: F811
# ogcapi_processes + "/req/core/job-exception-no-such-job",
# ogcapi_processes + "/req/core/job-results-exception/no-such-job",
ogcapi_processes + "/req/job-list/links",
ogcapi_processes + "/rec/job-list/job-list-landing-page",
ogcapi_processes + "/req/job-list/job-list-op",
ogcapi_processes + "/req/job-list/processID-definition",
ogcapi_processes + "/req/job-list/processID-mandatory",
ogcapi_processes + "/req/job-list/processid-response",
ogcapi_processes + "/req/job-list/type-definition",
ogcapi_processes + "/req/job-list/type-response",
# FIXME:
# https://github.com/opengeospatial/ogcapi-processes/blob/master/core/requirements/core/REQ_job-results-exception-results-not-ready.adoc
# type of exception SHALL be "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready"
Expand Down
25 changes: 17 additions & 8 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,16 @@ def validate_service_process(request):
"""
Verifies that service or process specified by path or query will raise the appropriate error if applicable.
"""
service_name = request.matchdict.get("provider_id", None) or request.params.get("service", None)
process_name = request.matchdict.get("process_id", None) or request.params.get("process", None)
service_name = (
request.matchdict.get("provider_id", None) or
request.params.get("provider", None) or
request.params.get("service", None) # backward compatibility
)
process_name = (
request.matchdict.get("process_id", None) or
request.params.get("process", None) or
request.params.get("processID", None) # OGC-API conformance
)
item_test = None
item_type = None

Expand Down Expand Up @@ -298,16 +306,17 @@ def get_queried_jobs(request):

settings = get_settings(request)
service, process = validate_service_process(request)
if service:
forbid_local_only(settings)

filters = {**request.params, "process": process, "provider": service}
params = dict(request.params)
for param_name in ["process", "processID", "provider", "service"]:
params.pop(param_name, None)
filters = {**params, "process": process, "provider": service}

filters["detail"] = asbool(request.params.get("detail"))
filters["detail"] = asbool(params.get("detail"))

if request.params.get("datetime", False):
if params.get("datetime", False):
# replace white space with '+' since request.params replaces '+' with whitespaces when parsing
filters["datetime"] = request.params["datetime"].replace(" ", "+")
filters["datetime"] = params["datetime"].replace(" ", "+")

try:
filters = sd.GetJobsQueries().deserialize(filters)
Expand Down
24 changes: 20 additions & 4 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,14 @@ class JobStatusEnum(ExtendedSchemaNode):
validator = OneOf(JOB_STATUS_CODE_API)


class JobTypeEnum(ExtendedSchemaNode):
schema_type = String
title = "JobType"
default = null
example = "process"
validator = OneOf(["process", "provider", "service"])


class JobSortEnum(ExtendedSchemaNode):
schema_type = String
title = "JobSortingMethod"
Expand Down Expand Up @@ -2474,6 +2482,7 @@ class JobStatusInfo(ExtendedMappingSchema):
description="Process identifier corresponding to the job execution.")
providerID = ProcessIdentifier(missing=None, default=None,
description="Provider identifier corresponding to the job execution.")
type = JobTypeEnum(description="Type of the element associated to the creation of this job.")
status = JobStatusEnum(description="Last updated status.")
message = ExtendedSchemaNode(String(), missing=drop, description="Information about the last status update.")
created = ExtendedSchemaNode(DateTime(), missing=drop, default=None,
Expand Down Expand Up @@ -2557,8 +2566,8 @@ class GetGroupedJobsSchema(ExtendedMappingSchema):

class GetQueriedJobsSchema(OneOfKeywordSchema):
_one_of = [
GetPagingJobsSchema,
GetGroupedJobsSchema,
GetPagingJobsSchema(description="Matched jobs according to filter queries."),
GetGroupedJobsSchema(description="Matched jobs grouped by specified categories."),
]
total = ExtendedSchemaNode(Integer(),
description="Total number of matched jobs regardless of grouping or paging result.")
Expand Down Expand Up @@ -3492,6 +3501,9 @@ class PostProcessJobsEndpoint(ProcessPath):


class GetJobsQueries(ExtendedMappingSchema):
# note:
# This schema is also used to generate any missing defaults during filter parameter handling.
# Items with default value are added if omitted, except 'default=null' which are removed after handling by alias.
detail = ExtendedSchemaNode(Boolean(), description="Provide job details instead of IDs.",
default=False, example=True, missing=drop)
groups = ExtendedSchemaNode(String(),
Expand All @@ -3501,8 +3513,12 @@ class GetJobsQueries(ExtendedMappingSchema):
limit = ExtendedSchemaNode(Integer(allow_string=True), missing=10, default=10, validator=Range(min=0, max=10000))
datetime = DateTimeInterval(missing=drop, default=None)
status = JobStatusEnum(missing=drop, default=None)
process = ProcessIdentifier(missing=drop, default=None)
provider = AnyIdentifier(missing=drop, default=None)
processID = ProcessIdentifier(missing=drop, default=null, description="Alias to 'process' for OGC-API compliance.")
process = ProcessIdentifier(missing=drop, default=None, description="Identifier of the process to filter search.")
service = AnyIdentifier(missing=drop, default=null, description="Alias to 'provider' for backward compatibility.")
provider = AnyIdentifier(missing=drop, default=None, description="Identifier of service provider to filter search.")
type = JobTypeEnum(missing=drop, default=null,
description="Filter jobs only to matching type (note: 'service' and 'provider' are aliases).")
sort = JobSortEnum(missing=drop)
access = JobAccess(missing=drop, default=None)
notification_email = ExtendedSchemaNode(String(), missing=drop, validator=Email())
Expand Down

0 comments on commit 87c421b

Please sign in to comment.