Skip to content

Commit

Permalink
[wip] working updates/listing - hacked tag 'id:version' as pseudo-uni…
Browse files Browse the repository at this point in the history
…que ID
  • Loading branch information
fmigneault committed Jun 17, 2022
1 parent aa687e1 commit 417cfcc
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 30 deletions.
48 changes: 47 additions & 1 deletion tests/wps_restapi/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,53 @@ def test_get_processes_bad_request_paging_providers(self):
assert "ListingInvalidParameter" in resp.json["error"]

def test_get_processes_with_revisions(self):
raise NotImplementedError # FIXME
"""Example:
http://localhost:4002/processes?detail=false&revisions=true
"processes": [
"anti-spoofing:0.1.0",
"CatFile:1.0.0",
"ColibriFlyingpigeon_SubsetBbox",
"demo-cat-file:1.0.0",
"docker-demo-cat",
"docker-python-script",
"DockerNetCDF2Text",
"Echo:1.0.0",
"file_index_selector:1.1.0",
"file2string_array:1.2.0",
"image-utils:0.0.1",
"jsonarray2netcdf:1.3.0",
"las2tif",
"memory-usage",
"memory-usage-2",
"memory-usage-3",
"memory-usage-4",
"memory-usage-5",
"memory-usage-6",
"memory-usage-script",
"metalink2netcdf:1.2.0",
"OutardeFlyingpigeon_SubsetBbox",
"python-script",
"sleep",
"Staging_S2L1C:0.0.1",
"Staging_S2L1C-mock-docker:0.0.1",
"test_blurring:0.0.1",
"test_generation:0.0.1",
"test_workflow:0.0.1",
"test-echo:1.0.0",
"test-report",
"WaterExtent_S2-mock-docker:0.0.1",
"WorkflowWaterExtent:0.0.1",
"WorkflowWaterExtent-mock:0.0.1",
"WPS1JsonArray2NetCDF:0.0.1"
],
"""
# FIXME:
# create some processes with different combinations of revisions, no-version, single-version
path = get_path_kvp("/processes", revisions=True, detail=False)
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)

raise NotImplementedError

@mocked_remote_server_requests_wps1([
resources.TEST_REMOTE_SERVER_URL,
Expand Down
10 changes: 5 additions & 5 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ def tag(self):
proc_id = self.id.split(":")[0]
# bw-compat, if no version available, no update was applied (single deploy)
# there is no need to define a tag as only one result can be found
# on next (if any) update request, this reversion will be updated with a default version
# on next (if any) update request, this revision will be updated with a default version
if self.version is None:
return proc_id
version = as_version_major_minor_patch(self.version, VersionFormat.STRING)
Expand Down Expand Up @@ -2235,8 +2235,8 @@ def links(self, container=None):
proc_hist = f"{proc_list}?detail=false&revisions=true&process={self.id}"
links.extend([
{"href": proc_tag, "rel": "working-copy", "title": "Tagged version of this process description."},
{"href": proc_desc, "rel": "latest-version", "title": "Most recent reversion of this process."},
{"href": proc_hist, "rel": "version-history", "title": "Listing of all reversions of this process."},
{"href": proc_desc, "rel": "latest-version", "title": "Most recent revision of this process."},
{"href": proc_hist, "rel": "version-history", "title": "Listing of all revisions of this process."},
])
versions = get_db(container).get_store(StoreProcesses).find_versions(self.id, VersionFormat.OBJECT)
proc_ver = as_version_major_minor_patch(self.version, VersionFormat.OBJECT)
Expand All @@ -2245,12 +2245,12 @@ def links(self, container=None):
if prev_ver:
proc_prev = f"{proc_desc}:{prev_ver[-1]!s}"
links.append(
{"href": proc_prev, "rel": "predecessor-version", "title": "Previous reversion of this process."}
{"href": proc_prev, "rel": "predecessor-version", "title": "Previous revision of this process."}
)
if next_ver:
proc_next = f"{proc_desc}:{next_ver[0]!s}"
links.append(
{"href": proc_next, "rel": "successor-version", "title": "Next reversion of this process."}
{"href": proc_next, "rel": "successor-version", "title": "Next revision of this process."}
)
if self.service:
api_base_url = proc_list.rsplit("/", 1)[0]
Expand Down
58 changes: 35 additions & 23 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
from weaver.visibility import AnyVisibility

MongodbValue = Union[AnyValueType, datetime.datetime]
MongodbSearchFilter = Dict[str, Union[MongodbValue, List[MongodbValue], Dict[str, AnyValueType]]]
MongodbSearchStep = Union[MongodbValue, MongodbSearchFilter]
MongodbSearchPipeline = List[Dict[str, Union[str, Dict[str, MongodbSearchStep]]]]
MongodbAggregateExpression = Dict[str, Union[MongodbValue, List[MongodbValue], Dict[str, AnyValueType]]]
MongodbAggregateStep = Union[MongodbValue, MongodbAggregateExpression]
MongodbAggregatePipeline = List[Dict[str, Union[str, Dict[str, MongodbAggregateStep]]]]

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,7 +207,7 @@ def clear_services(self):
class ListingMixin(object):
@staticmethod
def _apply_paging_pipeline(page, limit):
# type: (Optional[int], Optional[int]) -> List[MongodbSearchStep]
# type: (Optional[int], Optional[int]) -> List[MongodbAggregateStep]
if isinstance(page, int) and isinstance(limit, int):
return [{"$skip": page * limit}, {"$limit": limit}]
if page is None and isinstance(limit, int):
Expand All @@ -216,7 +216,7 @@ def _apply_paging_pipeline(page, limit):

@staticmethod
def _apply_sort_method(sort_field, sort_default, sort_allowed):
# type: (Optional[str], str, List[str]) -> MongodbSearchFilter
# type: (Optional[str], str, List[str]) -> MongodbAggregateExpression
sort = sort_field # keep original sort field in case of error
if sort is None:
sort = sort_default
Expand All @@ -233,7 +233,7 @@ def _apply_sort_method(sort_field, sort_default, sort_allowed):

@staticmethod
def _apply_total_result(search_pipeline, extra_pipeline):
# type: (MongodbSearchPipeline, MongodbSearchPipeline) -> MongodbSearchPipeline
# type: (MongodbAggregatePipeline, MongodbAggregatePipeline) -> MongodbAggregatePipeline
"""
Extends the pipeline operations in order to obtain the grand total of matches in parallel to other filtering.
Expand Down Expand Up @@ -488,7 +488,7 @@ def list_processes(self,
List of sorted, and possibly page-filtered, processes matching queries.
If ``total`` was requested, return a tuple of this list and the number of processes.
"""
search_filters = {} # type: MongodbSearchFilter
search_filters = {} # type: MongodbAggregateExpression

if process and revisions:
search_filters["identifier"] = {"$regex": rf"^{process}(:.*)?$"} # revisions of that process
Expand All @@ -507,6 +507,7 @@ def list_processes(self,
if vis not in Visibility:
raise ValueError(f"Invalid visibility value '{v!s}' is not one of {list(Visibility.values())!s}")
search_filters["visibility"] = {"$in": list(visibility)}
insert_fields = [] # type: MongodbAggregatePipeline

# processes do not have 'created', but ObjectID in '_id' has the particularity of embedding creation time
if sort == Sort.CREATED:
Expand All @@ -517,12 +518,23 @@ def list_processes(self,
sort_allowed = list(SortMethods.PROCESS) + ["_id"]
sort_fields = self._apply_sort_method(sort, Sort.ID_LONG, sort_allowed)
if revisions and sort in [Sort.ID, Sort.ID_LONG, Sort.PROCESS]:
# if listing many revisions, sort by version on top of ID to make listing more natural
sort_version = self._apply_sort_method(Sort.VERSION, Sort.VERSION, sort_allowed)
sort_fields.update(sort_version)
sort_method = {"$sort": sort_fields}

search_pipeline = [{"$match": search_filters}, sort_method]
# If listing many revisions, sort by version on top of ID to make listing more natural.
# Because the "latest version" is saved with 'id' only while "older revisions" are saved with 'id:version',
# that more recent version would always appear first since alphabetical sort: 'id' (latest) < 'id:version'.
# Work around this by dynamically reassigning 'id' by itself.
insert_fields = [
{"$set": {"tag": {"$cond": {
"if": {"identifier": "/^.*:.*$/"},
"then": "$identifier",
"else": {"$concat": ["$identifier", ":", "$version"]},
}}}},
{"$set": {"id_version": {"$split": ["$tag", ":"]}}},
{"$set": {"identifier": {"$arrayElemAt": ["$id_version", 0]}}},
]
sort_fields = {"identifier": pymongo.ASCENDING, "version": pymongo.ASCENDING}
sort_method = [{"$sort": sort_fields}]

search_pipeline = insert_fields + [{"$match": search_filters}] + sort_method
paging_pipeline = self._apply_paging_pipeline(page, limit)
if total:
pipeline = self._apply_total_result(search_pipeline, paging_pipeline)
Expand Down Expand Up @@ -577,7 +589,7 @@ def find_versions(self, process_id, version_format=VersionFormat.OBJECT):
sane_name = get_sane_name(process_id, **self.sane_name_config)
version_name = rf"^{sane_name}(:[0-9]+\.[0-9]+.[0-9]+)?$"
versions = self.collection.find(
filter={"identifier": version_name},
filter={"identifier": {"$regex": version_name}},
projection={"_id": False, "version": True},
sort=[(Sort.VERSION, pymongo.ASCENDING)],
)
Expand Down Expand Up @@ -854,7 +866,7 @@ def find_jobs(self,
return results

def _find_jobs_grouped(self, pipeline, group_categories):
# type: (MongodbSearchPipeline, List[str]) -> Tuple[JobGroupCategory, int]
# type: (MongodbAggregatePipeline, List[str]) -> Tuple[JobGroupCategory, int]
"""
Retrieves jobs regrouped by specified field categories and predefined search pipeline filters.
"""
Expand Down Expand Up @@ -892,7 +904,7 @@ def _find_jobs_grouped(self, pipeline, group_categories):
return items, total

def _find_jobs_paging(self, search_pipeline, page, limit):
# type: (MongodbSearchPipeline, Optional[int], Optional[int]) -> Tuple[List[Job], int]
# type: (MongodbAggregatePipeline, Optional[int], Optional[int]) -> Tuple[List[Job], int]
"""
Retrieves jobs limited by specified paging parameters and predefined search pipeline filters.
"""
Expand Down Expand Up @@ -921,7 +933,7 @@ def _apply_tags_filter(tags):

@staticmethod
def _apply_access_filter(access, request):
# type: (AnyVisibility, Request) -> MongodbSearchFilter
# type: (AnyVisibility, Request) -> MongodbAggregateExpression
search_filters = {}
if not request:
search_filters["access"] = Visibility.PUBLIC
Expand All @@ -943,9 +955,9 @@ def _apply_access_filter(access, request):

@staticmethod
def _apply_ref_or_type_filter(job_type, process, service):
# type: (Optional[str], Optional[str], Optional[str]) -> MongodbSearchFilter
# type: (Optional[str], Optional[str], Optional[str]) -> MongodbAggregateExpression

search_filters = {} # type: MongodbSearchFilter
search_filters = {} # type: MongodbAggregateExpression
if job_type == "process":
search_filters["service"] = None
elif job_type == "provider":
Expand All @@ -971,8 +983,8 @@ def _apply_ref_or_type_filter(job_type, process, service):

@staticmethod
def _apply_status_filter(status):
# type: (Optional[str]) -> MongodbSearchFilter
search_filters = {} # type: MongodbSearchFilter
# type: (Optional[str]) -> MongodbAggregateExpression
search_filters = {} # type: MongodbAggregateExpression
if status in JOB_STATUS_CATEGORIES:
category_statuses = list(JOB_STATUS_CATEGORIES[status])
search_filters["status"] = {"$in": category_statuses}
Expand All @@ -982,7 +994,7 @@ def _apply_status_filter(status):

@staticmethod
def _apply_datetime_filter(datetime_interval):
# type: (Optional[DatetimeIntervalType]) -> MongodbSearchFilter
# type: (Optional[DatetimeIntervalType]) -> MongodbAggregateExpression
search_filters = {}
if datetime_interval is not None:
if datetime_interval.get("after", False):
Expand All @@ -999,7 +1011,7 @@ def _apply_datetime_filter(datetime_interval):

@staticmethod
def _apply_duration_filter(pipeline, min_duration, max_duration):
# type: (MongodbSearchPipeline, Optional[int], Optional[int]) -> MongodbSearchPipeline
# type: (MongodbAggregatePipeline, Optional[int], Optional[int]) -> MongodbAggregatePipeline
"""
Generate the filter required for comparing against :meth:`Job.duration`.
Expand Down
2 changes: 1 addition & 1 deletion weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ class DeployMinMaxOccurs(ExtendedMappingSchema):
# does not inherit from 'DescriptionLinks' because other 'ProcessDescription<>' schema depend from this without 'links'
class ProcessDescriptionType(DescriptionBase, DescriptionExtra):
id = ProcessIdentifier()
version = Version(missing=drop, example="1.2.3")
version = Version(missing=None, example="1.2.3")
mutable = ExtendedSchemaNode(Boolean(), default=True, description=(
"Indicates if the process is mutable (dynamically deployed), or immutable (builtin with this instance)."
))
Expand Down

0 comments on commit 417cfcc

Please sign in to comment.