From 87c421b812656c5b67efa25f324568c2536a950d Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 4 Nov 2021 16:14:47 -0400 Subject: [PATCH] support Job type and processID filter queries (relates to #268) + provide Job status type field (resolves #351) + increate OGC conformance (relates to #53, #231) --- CHANGES.rst | 8 +- docs/source/references.rst | 3 + tests/wps_restapi/test_jobs.py | 173 +++++++++++++++++++--- weaver/datatype.py | 15 ++ weaver/store/mongodb.py | 39 ++++- weaver/wps_restapi/api.py | 7 + weaver/wps_restapi/jobs/jobs.py | 25 +++- weaver/wps_restapi/swagger_definitions.py | 24 ++- 8 files changed, 254 insertions(+), 40 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index e5c1f50c3..bee899f5a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,11 +10,15 @@ Changes Changes: -------- -- No change. +- Add support of ``type`` and ``processID`` query parameters for ``Job`` listing + (resolves some tasks in `#268 `_). +- Add ``type`` field to ``Job`` status information + (resolves `#351 `_). +- Add `OGC-API - Processes` conformance references regarding supported operations for ``Job`` listing and filtering. Fixes: ------ -- No change. +- Allow ``group`` query parameter to handle ``Job`` category listing with ``provider`` as ``service`` alias. `4.2.1 `_ (2021-10-20) ======================================================================== diff --git a/docs/source/references.rst b/docs/source/references.rst index b77a2ab3c..52913faf4 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -63,6 +63,9 @@ .. |submit-issue| replace:: submit a new issue .. _submit-issue: https://github.com/crim-ca/weaver/issues/new/choose +.. OGC-API references +.. |ogc-statusInfo| replace:: https://raw.githubusercontent.com/opengeospatial/ogcapi-processes/master/core/openapi/schemas/statusInfo.yaml + .. Example references .. |examples| replace:: Examples .. _examples: examples.rst diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index 93102a3ea..cdb285f3c 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -80,6 +80,9 @@ def setUp(self): self.process_private = WpsTestProcess(identifier="process-private") self.process_store.save_process(self.process_private) self.process_store.set_visibility(self.process_private.identifier, VISIBILITY_PRIVATE) + self.process_other = WpsTestProcess(identifier="process-other") + self.process_store.save_process(self.process_other) + self.process_store.set_visibility(self.process_other.identifier, VISIBILITY_PUBLIC) self.process_unknown = "process-unknown" self.service_public = Service(name="service-public", url="http://localhost/wps/service-public", public=True) @@ -87,6 +90,11 @@ def setUp(self): self.service_private = Service(name="service-private", url="http://localhost/wps/service-private", public=False) self.service_store.save_service(self.service_private) + self.service_one = Service(name="service-one", url="http://localhost/wps/service-one", public=True) + self.service_store.save_service(self.service_one) + self.service_two = Service(name="service-two", url="http://localhost/wps/service-two", public=True) + self.service_store.save_service(self.service_two) + # create jobs accessible by index self.job_info = [] # type: List[Job] self.make_job(task_id="0000-0000-0000-0000", process=self.process_public.identifier, service=None, @@ -114,6 +122,13 @@ def setUp(self): self.make_job(task_id="8888-8888-8888-8888", process=self.process_private.identifier, service=self.service_private.name, created=self.datetime_interval[3], user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) + # jobs with duplicate 'process' identifier, but under a different 'service' name + self.make_job(task_id="9999-9999-9999-9999", + process=self.process_other.identifier, service=self.service_one.name, + user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) + self.make_job(task_id="1010-1010-1010-1010", + process=self.process_other.identifier, service=self.service_two.name, + user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) def make_job(self, task_id, process, service, user_id, status, progress, access, created=None): @@ -193,7 +208,6 @@ def check_basic_jobs_info(response): assert "total" in response.json and isinstance(response.json["total"], int) assert "limit" in response.json and isinstance(response.json["limit"], int) assert len(response.json["jobs"]) <= response.json["limit"] - assert response.json["page"] == response.json["total"] // response.json["limit"] @staticmethod def check_basic_jobs_grouped_info(response, groups): @@ -282,13 +296,16 @@ def test_get_jobs_valid_grouping_by_process(self): elif categories["process"] == self.process_unknown: assert len(grouped_jobs["jobs"]) == 1 assert set(grouped_jobs["jobs"]) == {self.job_info[1].id} + elif categories["process"] == self.process_other.identifier: + assert len(grouped_jobs["jobs"]) == 2 + assert set(grouped_jobs["jobs"]) == {self.job_info[9].id, self.job_info[10].id} else: - pytest.fail("Unknown job grouping 'process' value not expected.") + pytest.fail("Unknown job grouping 'process' value: {}".format(categories["process"])) - def test_get_jobs_valid_grouping_by_service(self): - path = get_path_kvp(sd.jobs_service.path, detail="false", groups="service") + def template_get_jobs_valid_grouping_by_service_provider(self, service_or_provider): + path = get_path_kvp(sd.jobs_service.path, detail="false", groups=service_or_provider) resp = self.app.get(path, headers=self.json_headers) - self.check_basic_jobs_grouped_info(resp, groups="service") + self.check_basic_jobs_grouped_info(resp, groups=service_or_provider) # ensure that group categories are distinct for i, grouped_jobs in enumerate(resp.json["groups"]): @@ -300,17 +317,32 @@ def test_get_jobs_valid_grouping_by_service(self): assert categories != compared # validate groups with expected jobs counts and ids (nb: only public jobs are returned) - if categories["service"] == self.service_public.name: + if categories[service_or_provider] == self.service_public.name: assert len(grouped_jobs["jobs"]) == 3 assert set(grouped_jobs["jobs"]) == {self.job_info[1].id, self.job_info[5].id, self.job_info[6].id} - elif categories["service"] == self.service_private.name: + elif categories[service_or_provider] == self.service_private.name: assert len(grouped_jobs["jobs"]) == 2 assert set(grouped_jobs["jobs"]) == {self.job_info[7].id, self.job_info[8].id} - elif categories["service"] is None: + elif categories[service_or_provider] == self.service_one.name: + assert len(grouped_jobs["jobs"]) == 1 + assert set(grouped_jobs["jobs"]) == {self.job_info[9].id} + elif categories[service_or_provider] == self.service_two.name: + assert len(grouped_jobs["jobs"]) == 1 + assert set(grouped_jobs["jobs"]) == {self.job_info[10].id} + elif categories[service_or_provider] is None: assert len(grouped_jobs["jobs"]) == 2 assert set(grouped_jobs["jobs"]) == {self.job_info[0].id, self.job_info[2].id} else: - pytest.fail("Unknown job grouping 'service' value not expected.") + pytest.fail("Unknown job grouping 'service' value: {}".format(categories[service_or_provider])) + + def test_get_jobs_valid_grouping_by_service(self): + self.template_get_jobs_valid_grouping_by_service_provider("service") + + def test_get_jobs_valid_grouping_by_provider(self): + """ + Grouping by ``provider`` must work as alias to ``service`` and must be adjusted inplace in response categories. + """ + self.template_get_jobs_valid_grouping_by_service_provider("provider") def test_get_jobs_links_navigation(self): """ @@ -324,14 +356,21 @@ def get_links(resp_links): link_dict[_link["rel"]] = _link["href"] return link_dict - assert len(self.job_store.list_jobs()) == 9, "expected number of jobs mismatch, following test might not work" + expect_jobs_total = len(self.job_info) + expect_jobs_visible = len(list(filter(lambda j: VISIBILITY_PUBLIC in j.access, self.job_info))) + assert len(self.job_store.list_jobs()) == expect_jobs_total, ( + "expected number of jobs mismatch, following test might not work" + ) path = get_path_kvp(sd.jobs_service.path, limit=1000) resp = self.app.get(path, headers=self.json_headers) - assert len(resp.json["jobs"]) == 7, "unexpected number of visible jobs" + assert len(resp.json["jobs"]) == expect_jobs_visible, "unexpected number of visible jobs" base_url = self.settings["weaver.url"] jobs_url = base_url + sd.jobs_service.path - limit = 2 # expect 7 jobs to be visible, making 4 pages of 2 + limit = 2 # expect 9 jobs to be visible, making 5 pages of 2 + last = 4 + last_page = "page={}".format(last) + prev_last_page = "page={}".format(last - 1) limit_kvp = "limit={}".format(limit) path = get_path_kvp(sd.jobs_service.path, limit=limit) resp = self.app.get(path, headers=self.json_headers) @@ -345,7 +384,7 @@ def get_links(resp_links): assert links["prev"] is None, "no previous on first page (default page=0 used)" assert links["next"].startswith(jobs_url) and limit_kvp in links["next"] and "page=1" in links["next"] assert links["first"].startswith(jobs_url) and limit_kvp in links["first"] and "page=0" in links["first"] - assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and "page=3" in links["last"] + assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and last_page in links["last"] path = get_path_kvp(sd.jobs_service.path, limit=limit, page=2) resp = self.app.get(path, headers=self.json_headers) @@ -359,9 +398,9 @@ def get_links(resp_links): assert links["prev"].startswith(jobs_url) and limit_kvp in links["prev"] and "page=1" in links["prev"] assert links["next"].startswith(jobs_url) and limit_kvp in links["next"] and "page=3" in links["next"] assert links["first"].startswith(jobs_url) and limit_kvp in links["first"] and "page=0" in links["first"] - assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and "page=3" in links["last"] + assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and last_page in links["last"] - path = get_path_kvp(sd.jobs_service.path, limit=limit, page=3) + path = get_path_kvp(sd.jobs_service.path, limit=limit, page=last) resp = self.app.get(path, headers=self.json_headers) links = get_links(resp.json["links"]) assert len(resp.json["jobs"]) == 1, "last page should show only remaining jobs within limit" @@ -369,11 +408,11 @@ def get_links(resp_links): assert links["collection"] == jobs_url assert links["search"] == jobs_url assert links["up"] is None, "generic jobs endpoint doesn't have any parent collection" - assert links["current"].startswith(jobs_url) and limit_kvp in links["current"] and "page=3" in links["current"] - assert links["prev"].startswith(jobs_url) and limit_kvp in links["prev"] and "page=2" in links["prev"] + assert links["current"].startswith(jobs_url) and limit_kvp in links["current"] and last_page in links["current"] + assert links["prev"].startswith(jobs_url) and limit_kvp in links["prev"] and prev_last_page in links["prev"] assert links["next"] is None, "no next page on last" assert links["first"].startswith(jobs_url) and limit_kvp in links["first"] and "page=0" in links["first"] - assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and "page=3" in links["last"] + assert links["last"].startswith(jobs_url) and limit_kvp in links["last"] and last_page in links["last"] p_id = self.process_public.identifier # 5 jobs with this process, but only 3 visible p_j_url = base_url + sd.process_jobs_service.path.format(process_id=p_id) @@ -420,7 +459,7 @@ def get_links(resp_links): path = get_path_kvp(sd.jobs_service.path, limit=over_limit) resp = self.app.get(path, headers=self.json_headers) links = get_links(resp.json["links"]) - assert len(resp.json["jobs"]) == 7, "only 7 of 9 existing jobs should be visible" + assert len(resp.json["jobs"]) == expect_jobs_visible assert links["alternate"] is None assert links["collection"] == jobs_url assert links["search"] == jobs_url @@ -464,6 +503,96 @@ def test_get_jobs_by_encrypted_email(self): assert resp.json["total"] == 1, "Should match exactly 1 email with specified literal string as query param." assert resp.json["jobs"][0]["jobID"] == job_id + def test_get_jobs_by_type_process(self): + path = get_path_kvp(sd.jobs_service.path, type="process") + resp = self.app.get(path, headers=self.json_headers) + self.check_basic_jobs_info(resp) + expect_jobs = [self.job_info[i].id for i in [0, 2]] # idx=2 & idx>4 have 'service', only 0,2 are public + result_jobs = resp.json["jobs"] + assert len(resp.json["jobs"]) == len(expect_jobs) + assert resp.json["total"] == len(expect_jobs) + assert all(job in expect_jobs for job in result_jobs), self.message_with_jobs_diffs(result_jobs, expect_jobs) + + def test_get_jobs_by_type_process_and_specific_process_id(self): + path = get_path_kvp(sd.jobs_service.path, type="process", process=self.process_public.identifier) + resp = self.app.get(path, headers=self.json_headers) + self.check_basic_jobs_info(resp) + assert len(resp.json["jobs"]) == 1 + expect_job = self.job_info[0].id + assert resp.json["jobs"][0] == expect_job, self.message_with_jobs_mapping("expected only matching process") + + def test_get_jobs_by_type_process_and_specific_service_name(self): + """ + Requesting provider ``type`` with a specific ``process`` identifier cannot yield any valid result (contradicts). + + .. seealso:: + Test :meth:`test_get_jobs_by_type_process_and_specific_process_id` that contains a valid match otherwise + for the given process identifier. + """ + path = get_path_kvp(sd.jobs_service.path, type="process", provider=self.service_public.name) + resp = self.app.get(path, headers=self.json_headers, expect_errors=True) + assert resp.status_code == 400 + assert "cause" in resp.json and resp.json["cause"] == {"type": "process", "service": self.service_public.name} + + def template_get_jobs_by_type_service_provider(self, service_or_provider): + path = get_path_kvp(sd.jobs_service.path, type=service_or_provider) + resp = self.app.get(path, headers=self.json_headers) + self.check_basic_jobs_info(resp) + expect_jobs = [self.job_info[i].id for i in [1, 5, 6, 7, 8, 9]] # has 'service' & public access + result_jobs = resp.json["jobs"] + assert len(resp.json["jobs"]) == len(expect_jobs) + assert resp.json["total"] == len(expect_jobs) + assert all(job in expect_jobs for job in result_jobs), self.message_with_jobs_diffs(result_jobs, expect_jobs) + + def template_get_jobs_by_type_service(self): + self.template_get_jobs_by_type_service_provider("service") + + def template_get_jobs_by_type_provider(self): + self.template_get_jobs_by_type_service_provider("provider") + + def test_get_jobs_by_type_provider_and_specific_service_name(self): + path = get_path_kvp(sd.jobs_service.path, type="provider", provider=self.service_public.name) + resp = self.app.get(path, headers=self.json_headers) + self.check_basic_jobs_info(resp) + expect_jobs = [self.job_info[i].id for i in [1, 5, 6]] # has 'service' & public access, others not same name + result_jobs = resp.json["jobs"] + assert len(resp.json["jobs"]) == len(expect_jobs) + assert resp.json["total"] == len(expect_jobs) + assert all(job in expect_jobs for job in result_jobs), self.message_with_jobs_diffs(result_jobs, expect_jobs) + + def test_get_jobs_by_type_provider_and_specific_process_id(self): + """ + Requesting provider ``type`` with more specific ``process`` identifier further filters result. + + .. note:: + Technically, two distinct providers could employ the same sub-process identifier. + Should not impact nor create a conflict here. + + Test :meth:`test_get_jobs_by_type_provider` should return more results since no sub-process filtering. + + Extra process from another provider than in :meth:`test_get_jobs_by_type_provider_and_specific_service_name` + should now be returned as well. + + .. seealso:: + - :meth:`test_get_jobs_by_type_provider` + - :meth:`test_get_jobs_by_type_provider_and_specific_service_name` + """ + path = get_path_kvp(sd.jobs_service.path, type="provider", process=self.process_other.identifier, detail=True) + resp = self.app.get(path, headers=self.json_headers) + self.check_basic_jobs_info(resp) + assert len(resp.json["jobs"]) == 2 + expect_jobs = [self.job_info[i].id for i in [9, 10]] + result_jobs = [job["jobID"] for job in resp.json["jobs"]] + assert len(result_jobs) == len(expect_jobs) + assert resp.json["total"] == len(expect_jobs) + assert all(job in expect_jobs for job in result_jobs), self.message_with_jobs_diffs(result_jobs, expect_jobs) + for job in resp.json["jobs"]: + assert job["processID"] == self.process_other.identifier + if job["jobID"] == self.job_info[9].id: + assert job["providerID"] == self.service_one.name + if job["jobID"] == self.job_info[10].id: + assert job["providerID"] == self.service_two.name + def test_get_jobs_process_in_query_normal(self): path = get_path_kvp(sd.jobs_service.path, process=self.job_info[0].process) resp = self.app.get(path, headers=self.json_headers) @@ -641,7 +770,7 @@ def filter_service(jobs): # type: (Iterable[Job]) -> List[Job] stack.enter_context(patch) for patch in mocked_remote_wps([self.process_public]): stack.enter_context(patch) - test = get_path_kvp(path, access=access) if access else path + test = get_path_kvp(path, access=access, limit=1000) if access else path resp = self.app.get(test, headers=self.json_headers) self.check_basic_jobs_info(resp) job_ids = [job.id for job in expected_jobs] @@ -693,7 +822,9 @@ def test_jobs_datetime_before(self): resp = self.app.get(path, headers=self.json_headers) assert resp.status_code == 200 assert resp.content_type == CONTENT_TYPE_APP_JSON - assert len(resp.json["jobs"]) == 4 + # generated datetime interval have an offset that makes all job in the future + # anything created "recently" and publicly visible will be listed here + assert len(resp.json["jobs"]) == 6 for job in resp.json["jobs"]: base_uri = sd.jobs_service.path + "/{}".format(job) path = get_path_kvp(base_uri) diff --git a/weaver/datatype.py b/weaver/datatype.py index 91abde9bb..5e037c8b3 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -605,6 +605,20 @@ def process(self, process): raise TypeError("Type 'str' is required for '{}.process'".format(type(self))) self["process"] = process + @property + def type(self): + # type: () -> str + """ + Obtain the type of the element associated to the creation of this job. + + .. seealso:: + - Defined in |ogc-statusInfo| + - Queried with https://docs.ogc.org/DRAFTS/18-062.html#_parameter_type + """ + if self.service is None: + return "process" + return "provider" + def _get_inputs(self): # type: () -> List[Optional[Dict[str, Any]]] if self.get("inputs") is None: @@ -1024,6 +1038,7 @@ def json(self, container=None, self_link=None): # pylint: disable=W0221,argu "jobID": self.id, "processID": self.process, "providerID": self.service, + "type": self.type, "status": map_status(self.status), "message": self.status_message, "created": self.created, diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 77af19c65..2fe5c0b31 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -54,7 +54,10 @@ from pymongo.collection import Collection from weaver.store.base import DatetimeIntervalType, JobCategoriesAndCount, JobListAndCount - from weaver.typedefs import AnyProcess, AnyProcessType + from weaver.typedefs import AnyProcess, AnyProcessType, AnyValue + + AnyValueMongo = Union[AnyValue, datetime.datetime] + SearchFilterMongo = Union[AnyValueMongo, Dict[str, Union[AnyValueMongo, List[AnyValueMongo]]]] LOGGER = logging.getLogger(__name__) @@ -542,6 +545,7 @@ def list_jobs(self): def find_jobs(self, process=None, # type: Optional[str] service=None, # type: Optional[str] + type=None, # type: Optional[str] tags=None, # type: Optional[List[str]] access=None, # type: Optional[str] notification_email=None, # type: Optional[str] @@ -583,6 +587,7 @@ def find_jobs(self, :param request: request that lead to this call to obtain permissions and user id. :param process: process name to filter matching jobs. :param service: service name to filter matching jobs. + :param type: filter matching jobs for given type. :param tags: list of tags to filter matching jobs. :param access: access visibility to filter matching jobs (default: :py:data:`VISIBILITY_PUBLIC`). :param notification_email: notification email to filter matching jobs. @@ -603,7 +608,7 @@ def find_jobs(self, "locator": "tags", }) - search_filters = {} + search_filters = {} # type: Dict[str, SearchFilterMongo] if not request: search_filters.setdefault("access", VISIBILITY_PUBLIC) @@ -623,17 +628,32 @@ def find_jobs(self, search_filters["tags"] = {"$all": tags} if status in JOB_STATUS_CATEGORIES: - search_filters["status"] = {"$in": JOB_STATUS_CATEGORIES[status]} + category_statuses = list(JOB_STATUS_CATEGORIES[status]) + search_filters["status"] = {"$in": category_statuses} elif status: search_filters["status"] = status if notification_email is not None: search_filters["notification_email"] = notification_email + if type == "process": + search_filters["service"] = None + elif type == "provider": + search_filters["service"] = {"$ne": None} + if process is not None: + # if (type=provider and process=) + # doesn't contradict since it can be more specific about sub-process of service search_filters["process"] = process if service is not None: + # can override 'service' set by 'type' to be more specific, but must be logical + # (e.g.: type=process and service= cannot ever yield anything) + if search_filters.get("service", -1) is None: + raise JobInvalidParameter(json={ + "description": "Ambiguous type requested contradicts with requested service provider.", + "cause": {"service": service, "type": type} + }) search_filters["service"] = service if datetime is not None: @@ -656,8 +676,9 @@ def find_jobs(self, sort = "user_id" if sort not in JOB_SORT_VALUES: raise JobInvalidParameter(json={ - "description": "Invalid sorting method: '{}'".format(repr(sort)), - "locator": "sort" + "description": "Invalid sorting method.", + "cause": "sort", + "value": str(sort), }) sort_order = DESCENDING if sort in (SORT_FINISHED, SORT_CREATED) else ASCENDING sort_criteria = {sort: sort_order} @@ -668,6 +689,10 @@ def find_jobs(self, # results by group categories if group_by: group_by = [group_by] if isinstance(group_by, str) else group_by # type: List[str] + has_provider = "provider" in group_by + if has_provider: + group_by.remove("provider") + group_by.append("service") group_categories = {field: "$" + field for field in group_by} # fields that can generate groups pipeline.extend([{ "$group": { @@ -685,6 +710,10 @@ def find_jobs(self, found = self.collection.aggregate(pipeline) items = [{k: (v if k != "jobs" else [Job(j) for j in v]) # convert to Job object where applicable for k, v in i.items()} for i in found] + if has_provider: + for group_result in items: + group_service = group_result["category"].pop("service", None) + group_result["category"]["provider"] = group_service # results with paging else: diff --git a/weaver/wps_restapi/api.py b/weaver/wps_restapi/api.py index a1b98d0e6..bdeef04f7 100644 --- a/weaver/wps_restapi/api.py +++ b/weaver/wps_restapi/api.py @@ -359,6 +359,13 @@ def api_conformance(request): # noqa: F811 # ogcapi_processes + "/req/core/job-exception-no-such-job", # ogcapi_processes + "/req/core/job-results-exception/no-such-job", ogcapi_processes + "/req/job-list/links", + ogcapi_processes + "/rec/job-list/job-list-landing-page", + ogcapi_processes + "/req/job-list/job-list-op", + ogcapi_processes + "/req/job-list/processID-definition", + ogcapi_processes + "/req/job-list/processID-mandatory", + ogcapi_processes + "/req/job-list/processid-response", + ogcapi_processes + "/req/job-list/type-definition", + ogcapi_processes + "/req/job-list/type-response", # FIXME: # https://github.com/opengeospatial/ogcapi-processes/blob/master/core/requirements/core/REQ_job-results-exception-results-not-ready.adoc # type of exception SHALL be "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready" diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index 899ed112a..373d4c14b 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -240,8 +240,16 @@ def validate_service_process(request): """ Verifies that service or process specified by path or query will raise the appropriate error if applicable. """ - service_name = request.matchdict.get("provider_id", None) or request.params.get("service", None) - process_name = request.matchdict.get("process_id", None) or request.params.get("process", None) + service_name = ( + request.matchdict.get("provider_id", None) or + request.params.get("provider", None) or + request.params.get("service", None) # backward compatibility + ) + process_name = ( + request.matchdict.get("process_id", None) or + request.params.get("process", None) or + request.params.get("processID", None) # OGC-API conformance + ) item_test = None item_type = None @@ -298,16 +306,17 @@ def get_queried_jobs(request): settings = get_settings(request) service, process = validate_service_process(request) - if service: - forbid_local_only(settings) - filters = {**request.params, "process": process, "provider": service} + params = dict(request.params) + for param_name in ["process", "processID", "provider", "service"]: + params.pop(param_name, None) + filters = {**params, "process": process, "provider": service} - filters["detail"] = asbool(request.params.get("detail")) + filters["detail"] = asbool(params.get("detail")) - if request.params.get("datetime", False): + if params.get("datetime", False): # replace white space with '+' since request.params replaces '+' with whitespaces when parsing - filters["datetime"] = request.params["datetime"].replace(" ", "+") + filters["datetime"] = params["datetime"].replace(" ", "+") try: filters = sd.GetJobsQueries().deserialize(filters) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index d2390cb2d..6eb44fd66 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -1319,6 +1319,14 @@ class JobStatusEnum(ExtendedSchemaNode): validator = OneOf(JOB_STATUS_CODE_API) +class JobTypeEnum(ExtendedSchemaNode): + schema_type = String + title = "JobType" + default = null + example = "process" + validator = OneOf(["process", "provider", "service"]) + + class JobSortEnum(ExtendedSchemaNode): schema_type = String title = "JobSortingMethod" @@ -2474,6 +2482,7 @@ class JobStatusInfo(ExtendedMappingSchema): description="Process identifier corresponding to the job execution.") providerID = ProcessIdentifier(missing=None, default=None, description="Provider identifier corresponding to the job execution.") + type = JobTypeEnum(description="Type of the element associated to the creation of this job.") status = JobStatusEnum(description="Last updated status.") message = ExtendedSchemaNode(String(), missing=drop, description="Information about the last status update.") created = ExtendedSchemaNode(DateTime(), missing=drop, default=None, @@ -2557,8 +2566,8 @@ class GetGroupedJobsSchema(ExtendedMappingSchema): class GetQueriedJobsSchema(OneOfKeywordSchema): _one_of = [ - GetPagingJobsSchema, - GetGroupedJobsSchema, + GetPagingJobsSchema(description="Matched jobs according to filter queries."), + GetGroupedJobsSchema(description="Matched jobs grouped by specified categories."), ] total = ExtendedSchemaNode(Integer(), description="Total number of matched jobs regardless of grouping or paging result.") @@ -3492,6 +3501,9 @@ class PostProcessJobsEndpoint(ProcessPath): class GetJobsQueries(ExtendedMappingSchema): + # note: + # This schema is also used to generate any missing defaults during filter parameter handling. + # Items with default value are added if omitted, except 'default=null' which are removed after handling by alias. detail = ExtendedSchemaNode(Boolean(), description="Provide job details instead of IDs.", default=False, example=True, missing=drop) groups = ExtendedSchemaNode(String(), @@ -3501,8 +3513,12 @@ class GetJobsQueries(ExtendedMappingSchema): limit = ExtendedSchemaNode(Integer(allow_string=True), missing=10, default=10, validator=Range(min=0, max=10000)) datetime = DateTimeInterval(missing=drop, default=None) status = JobStatusEnum(missing=drop, default=None) - process = ProcessIdentifier(missing=drop, default=None) - provider = AnyIdentifier(missing=drop, default=None) + processID = ProcessIdentifier(missing=drop, default=null, description="Alias to 'process' for OGC-API compliance.") + process = ProcessIdentifier(missing=drop, default=None, description="Identifier of the process to filter search.") + service = AnyIdentifier(missing=drop, default=null, description="Alias to 'provider' for backward compatibility.") + provider = AnyIdentifier(missing=drop, default=None, description="Identifier of service provider to filter search.") + type = JobTypeEnum(missing=drop, default=null, + description="Filter jobs only to matching type (note: 'service' and 'provider' are aliases).") sort = JobSortEnum(missing=drop) access = JobAccess(missing=drop, default=None) notification_email = ExtendedSchemaNode(String(), missing=drop, validator=Email())