Skip to content

Commit

Permalink
Issue #668 support federation extension on list_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jan 31, 2025
1 parent 156506d commit 97ec3bf
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 28 deletions.
39 changes: 27 additions & 12 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from openeo.rest.mlmodel import MlModel
from openeo.rest.models.general import (
CollectionListingResponse,
JobListingResponse,
ProcessListingResponse,
UserDefinedProcessListingResponse,
)
Expand Down Expand Up @@ -687,8 +688,13 @@ def list_collections(self) -> CollectionListingResponse:
it is recommended to use :py:meth:`~openeo.rest.connection.Connection.describe_collection` instead.
:return: list of dictionaries with basic collection metadata.
.. versionchanged:: 0.38.0
Returns a :py:class:`~openeo.rest.models.general.CollectionListingResponse` object
instead of a simple ``List[dict]``.
"""
# TODO: add caching #383, but reset cache on auth change #254
# TODO #677 add pagination support?
data = self.get("/collections", expected_status=200).json()
return CollectionListingResponse(data)

Expand Down Expand Up @@ -834,15 +840,20 @@ def list_processes(self, namespace: Optional[str] = None) -> ProcessListingRespo
:param namespace: The namespace for which to list processes.
:return: listing of available processes
.. versionchanged:: 0.38.0
Returns a :py:class:`~openeo.rest.models.general.ProcessListingResponse` object
instead of a simple ``List[dict]``.
"""
# TODO: Maybe format the result dictionary so that the process_id is the key of the dictionary.
# TODO #677 add pagination support?
if namespace is None:
response = self._capabilities_cache.get(
key=("processes", "backend"), load=lambda: self.get("/processes", expected_status=200).json()
)
else:
response = self.get("/processes/" + namespace, expected_status=200).json()
return ProcessListingResponse(data=response)
return ProcessListingResponse(response_data=response)

def describe_process(self, id: str, namespace: Optional[str] = None) -> dict:
"""
Expand All @@ -861,26 +872,25 @@ def describe_process(self, id: str, namespace: Optional[str] = None) -> dict:

raise OpenEoClientException("Process does not exist.")

def list_jobs(self, limit: Union[int, None] = None) -> List[dict]:
def list_jobs(self, limit: Union[int, None] = None) -> JobListingResponse:
"""
Lists all jobs of the authenticated user.
Lists (batch) jobs metadata of the authenticated user.
:param limit: maximum number of jobs to return. Setting this limit enables pagination.
:return: job_list: Dict of all jobs of the user.
.. versionadded:: 0.36.0
.. versionchanged:: 0.36.0
Added ``limit`` argument
.. versionchanged:: 0.38.0
Returns a :py:class:`~openeo.rest.models.general.JobListingResponse` object
instead of simple ``List[dict]``.
"""
# TODO: Parse the result so that Job classes returned?
resp = self.get("/jobs", params={"limit": limit}, expected_status=200).json()
if resp.get("federation:missing"):
_log.warning("Partial user job listing due to missing federation components: {c}".format(
c=",".join(resp["federation:missing"])
))
# TODO: when pagination is enabled: how to expose link to next page?
jobs = resp["jobs"]
return VisualList("data-table", data=jobs, parameters={'columns': 'jobs'})
resp = self.get("/jobs", params={"limit": limit}, expected_status=200).json()
return JobListingResponse(response_data=resp)

def assert_user_defined_process_support(self):
"""
Expand Down Expand Up @@ -935,10 +945,15 @@ def save_user_defined_process(
def list_user_defined_processes(self) -> UserDefinedProcessListingResponse:
"""
Lists all user-defined processes of the authenticated user.
.. versionchanged:: 0.38.0
Returns a :py:class:`~openeo.rest.models.general.UserDefinedProcessListingResponse` object
instead of a simple ``List[dict]``.
"""
# TODO #677 add pagination support?
self.assert_user_defined_process_support()
data = self.get("/process_graphs", expected_status=200).json()
return UserDefinedProcessListingResponse(data=data)
return UserDefinedProcessListingResponse(response_data=data)

def user_defined_process(self, user_defined_process_id: str) -> RESTUserDefinedProcess:
"""
Expand Down
10 changes: 10 additions & 0 deletions openeo/rest/models/federation_extension.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from typing import List, Union

_log = logging.getLogger(__name__)

class FederationExtension:
"""
Expand All @@ -22,3 +24,11 @@ def missing(self) -> Union[List[str], None]:
Or None, when ``federation:missing`` is not present in response.
"""
return self._data.get("federation:missing", None)

def warn_on_missing(self, resource_name: str) -> None:
"""
Warn about presence of ``federation:missing`` in the resource.
"""
missing = self.missing
if missing:
_log.warning(f"Partial {resource_name}: missing federation components: {missing!r}.")
75 changes: 62 additions & 13 deletions openeo/rest/models/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def from_dict(cls, data: dict) -> Link:
return cls(rel=data["rel"], href=data["href"], type=data.get("type"), title=data.get("title"))

# TODO: add _html_repr_ for Jupyter integration
# TODO: also provide container for list of links with methods to easily look up by `rel` or `type`


class CollectionListingResponse(list):
Expand All @@ -38,7 +39,7 @@ class CollectionListingResponse(list):
:py:meth:`~openeo.rest.connection.Connection.list_collections()`,
but now also provides methods/properties to access additional response data.
:param data: response data from a ``GET /collections`` request
:param response_data: response data from a ``GET /collections`` request
.. seealso:: :py:meth:`openeo.rest.connection.Connection.list_collections()`
Expand All @@ -47,10 +48,12 @@ class CollectionListingResponse(list):

__slots__ = ["_data"]

def __init__(self, data: dict):
self._data = data
def __init__(self, response_data: dict, *, warn_on_federation_missing: bool = True):
self._data = response_data
# Mimic original list of collection metadata dictionaries
super().__init__(data["collections"])
super().__init__(response_data["collections"])
if warn_on_federation_missing:
self.ext_federation.warn_on_missing(resource_name="collection listing")

def _repr_html_(self):
return render_component(component="collections", data=self)
Expand All @@ -77,7 +80,7 @@ class ProcessListingResponse(list):
:py:meth:`~openeo.rest.connection.Connection.list_processes()`,
but now also provides methods/properties to access additional response data.
:param data: response data from a ``GET /processes`` request
:param response_data: response data from a ``GET /processes`` request
.. seealso:: :py:meth:`openeo.rest.connection.Connection.list_processes()`
Expand All @@ -86,10 +89,12 @@ class ProcessListingResponse(list):

__slots__ = ["_data"]

def __init__(self, data: dict):
self._data = data
def __init__(self, response_data: dict, *, warn_on_federation_missing: bool = True):
self._data = response_data
# Mimic original list of process metadata dictionaries
super().__init__(data["processes"])
super().__init__(response_data["processes"])
if warn_on_federation_missing:
self.ext_federation.warn_on_missing(resource_name="process listing")

def _repr_html_(self):
return render_component(
Expand Down Expand Up @@ -118,20 +123,23 @@ class UserDefinedProcessListingResponse(list):
:py:meth:`~openeo.rest.connection.Connection.list_user_defined_processes()`,
but now also provides methods/properties to access additional response data.
:param data: response data from a ``GET /process_graphs`` request
:param response_data: response data from a ``GET /process_graphs`` request
.. seealso:: :py:meth:`openeo.rest.connection.Connection.list_user_defined_processes()`
.. versionadded:: 0.38.0
"""

# TODO: this is a copy of ProcessListingResponse, but with different class name

__slots__ = ["_data"]

def __init__(self, data: dict):
self._data = data
def __init__(self, response_data: dict, *, warn_on_federation_missing: bool = True):
self._data = response_data
# Mimic original list of process metadata dictionaries
super().__init__(data["processes"])
super().__init__(response_data["processes"])
if warn_on_federation_missing:
self.ext_federation.warn_on_missing(resource_name="process listing")

def _repr_html_(self):
return render_component(
Expand All @@ -147,3 +155,44 @@ def links(self) -> List[Link]:
def ext_federation(self) -> FederationExtension:
"""Accessor for federation extension data related to this resource."""
return FederationExtension(self._data)


class JobListingResponse(list):
"""
Container for job metadata listing received
from a ``GET /jobs`` request.
.. note::
This object mimics a simple ``List[dict]`` with job metadata,
which was the original return API of
:py:meth:`~openeo.rest.connection.Connection.list_jobs()`,
but now also provides methods/properties to access additional response data.
:param response_data: response data from a ``GET /jobs`` request
.. seealso:: :py:meth:`openeo.rest.connection.Connection.list_jobs()`
.. versionadded:: 0.38.0
"""

__slots__ = ["_data"]

def __init__(self, response_data: dict, *, warn_on_federation_missing: bool = True):
self._data = response_data
# Mimic original list of process metadata dictionaries
super().__init__(response_data["jobs"])
if warn_on_federation_missing:
self.ext_federation.warn_on_missing(resource_name="job listing")

def _repr_html_(self):
return render_component(component="data-table", data=self, parameters={"columns": "jobs"})

@property
def links(self) -> List[Link]:
"""Get links related to this resource."""
return [Link.from_dict(d) for d in self._data.get("links", [])]

@property
def ext_federation(self) -> FederationExtension:
"""Accessor for federation extension data related to this resource."""
return FederationExtension(self._data)
9 changes: 6 additions & 3 deletions tests/rest/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3330,7 +3330,7 @@ def test_list_collections(requests_mock):
assert con.list_collections() == collections


def test_list_collections_extra_metadata(requests_mock):
def test_list_collections_extra_metadata(requests_mock, caplog):
requests_mock.get(API_URL, json={"api_version": "1.0.0"})
requests_mock.get(
API_URL + "collections",
Expand All @@ -3345,6 +3345,7 @@ def test_list_collections_extra_metadata(requests_mock):
assert collections == [{"id": "S2"}, {"id": "NDVI"}]
assert collections.links == [Link(rel="next", href="https://oeo.test/collections?page=2", type=None, title=None)]
assert collections.ext_federation.missing == ["oeob"]
assert "Partial collection listing: missing federation components: ['oeob']." in caplog.text


def test_describe_collection(requests_mock):
Expand Down Expand Up @@ -3407,7 +3408,7 @@ def test_list_processes_namespace(requests_mock):
assert m.call_count == 1


def test_list_processes_extra_metadata(requests_mock):
def test_list_processes_extra_metadata(requests_mock, caplog):
requests_mock.get(API_URL, json={"api_version": "1.0.0"})
m = requests_mock.get(
API_URL + "processes",
Expand All @@ -3422,6 +3423,7 @@ def test_list_processes_extra_metadata(requests_mock):
assert processes == [{"id": "add"}, {"id": "mask"}]
assert processes.links == [Link(rel="next", href="https://oeo.test/processes?page=2", type=None, title=None)]
assert processes.ext_federation.missing == ["oeob"]
assert "Partial process listing: missing federation components: ['oeob']." in caplog.text


def test_get_job(requests_mock):
Expand Down Expand Up @@ -3717,7 +3719,7 @@ def test_list_udps(self, requests_mock, test_data):
user_udps = conn.list_user_defined_processes()
assert user_udps == [udp]

def test_list_udps_extra_metadata(self, requests_mock, test_data):
def test_list_udps_extra_metadata(self, requests_mock, test_data, caplog):
requests_mock.get(API_URL, json=build_capabilities(udp=True))
requests_mock.get(
API_URL + "process_graphs",
Expand All @@ -3733,6 +3735,7 @@ def test_list_udps_extra_metadata(self, requests_mock, test_data):
assert udps == [{"id": "myevi"}]
assert udps.links == [Link(rel="about", href="https://oeo.test/my-evi")]
assert udps.ext_federation.missing == ["oeob"]
assert "Partial process listing: missing federation components: ['oeob']." in caplog.text


def test_list_udps_unsupported(self, requests_mock):
Expand Down
45 changes: 45 additions & 0 deletions tests/rest/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import openeo.rest.job
from openeo.rest import JobFailedException, OpenEoApiPlainError, OpenEoClientException
from openeo.rest.job import BatchJob, ResultAsset
from openeo.rest.models.general import Link

from .test_connection import _credentials_basic_handler

Expand Down Expand Up @@ -762,3 +763,47 @@ def get_jobs(request, context):
{"id": "job123", "status": "running", "created": "2021-02-22T09:00:00Z"},
{"id": "job456", "status": "created", "created": "2021-03-22T10:00:00Z"},
]


def test_list_jobs_extra_metadata(con100, requests_mock, caplog):
# TODO: avoid this boilerplate duplication
username = "john"
password = "j0hn!"
access_token = "6cc35!"
requests_mock.get(
API_URL + "/credentials/basic",
text=_credentials_basic_handler(username=username, password=password, access_token=access_token),
)

def get_jobs(request, context):
assert request.headers["Authorization"] == f"Bearer basic//{access_token}"
return {
"jobs": [
{
"id": "job123",
"status": "running",
"created": "2021-02-22T09:00:00Z",
},
{
"id": "job456",
"status": "created",
"created": "2021-03-22T10:00:00Z",
},
],
"links": [
{"rel": "next", "href": API_URL + "/jobs?limit=2&offset=2"},
],
"federation:missing": ["oeob"],
}

requests_mock.get(API_URL + "/jobs", json=get_jobs)

con100.authenticate_basic(username, password)
jobs = con100.list_jobs()
assert jobs == [
{"id": "job123", "status": "running", "created": "2021-02-22T09:00:00Z"},
{"id": "job456", "status": "created", "created": "2021-03-22T10:00:00Z"},
]
assert jobs.links == [Link(rel="next", href="https://oeo.test/jobs?limit=2&offset=2")]
assert jobs.ext_federation.missing == ["oeob"]
assert "Partial job listing: missing federation components: ['oeob']." in caplog.text

0 comments on commit 97ec3bf

Please sign in to comment.