Skip to content

Commit 858c06d

Browse files
committed
Issue #385 Improve support for running UDF directly on vector cube
1 parent 4f3fc71 commit 858c06d

File tree

8 files changed

+392
-149
lines changed

8 files changed

+392
-149
lines changed

docs/api.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ openeo.rest.datacube
2727
:inherited-members:
2828
:special-members: __init__
2929

30+
.. automodule:: openeo.rest._datacube
31+
:members: UDF
32+
3033

3134
openeo.rest.vectorcube
3235
------------------------

openeo/rest/_datacube.py

Lines changed: 173 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
import json
22
import logging
3+
import pathlib
4+
import re
35
import sys
46
import typing
5-
from pathlib import Path
6-
from typing import Optional, Union, Tuple
77
import uuid
8+
import warnings
9+
from pathlib import Path
10+
from typing import Union, Tuple, Optional
11+
12+
import requests
813

14+
import openeo.processes
915
from openeo.internal.compat import nullcontext
1016
from openeo.internal.graph_building import PGNode, _FromNodeMixin
1117
from openeo.internal.jupyter import render_component
18+
from openeo.internal.warnings import UserDeprecationWarning
19+
from openeo.rest import OpenEoClientException
20+
from openeo.util import dict_no_none
1221

1322
if typing.TYPE_CHECKING:
1423
# Imports for type checking only (circular import issue at runtime).
@@ -136,8 +145,166 @@ def _build_pgnode(
136145
def _repr_html_(self):
137146
process = {"process_graph": self.flat_graph()}
138147
parameters = {
139-
'id': uuid.uuid4().hex,
140-
'explicit-zoom': True,
141-
'height': '400px'
148+
"id": uuid.uuid4().hex,
149+
"explicit-zoom": True,
150+
"height": "400px",
142151
}
143-
return render_component('model-builder', data = process, parameters = parameters)
152+
return render_component("model-builder", data=process, parameters=parameters)
153+
154+
155+
class UDF:
156+
"""
157+
Helper class to load UDF code (e.g. from file) and embed them as "callback" or child process in a process graph.
158+
159+
Usage example:
160+
161+
.. code-block:: python
162+
163+
udf = UDF.from_file("my-udf-code.py")
164+
cube = cube.apply(process=udf)
165+
166+
167+
.. versionchanged:: 0.13.0
168+
Added auto-detection of ``runtime``.
169+
Specifying the ``data`` argument is not necessary anymore, and actually deprecated.
170+
Added :py:meth:`from_file` to simplify loading UDF code from a file.
171+
See :ref:`old_udf_api` for more background about the changes.
172+
"""
173+
174+
__slots__ = ["code", "_runtime", "version", "context", "_source"]
175+
176+
def __init__(
177+
self,
178+
code: str,
179+
runtime: Optional[str] = None,
180+
data=None, # TODO #181 remove `data` argument
181+
version: Optional[str] = None,
182+
context: Optional[dict] = None,
183+
_source=None,
184+
):
185+
"""
186+
Construct a UDF object from given code string and other argument related to the ``run_udf`` process.
187+
188+
:param code: UDF source code string (Python, R, ...)
189+
:param runtime: optional UDF runtime identifier, will be autodetected from source code if omitted.
190+
:param data: unused leftover from old API. Don't use this argument, it will be removed in a future release.
191+
:param version: optional UDF runtime version string
192+
:param context: optional additional UDF context data
193+
:param _source: (for internal use) source identifier
194+
"""
195+
# TODO: automatically dedent code (when literal string) ?
196+
self.code = code
197+
self._runtime = runtime
198+
self.version = version
199+
self.context = context
200+
self._source = _source
201+
if data is not None:
202+
# TODO #181 remove `data` argument
203+
warnings.warn(
204+
f"The `data` argument of `{self.__class__.__name__}` is deprecated, unused and will be removed in a future release.",
205+
category=UserDeprecationWarning,
206+
stacklevel=2,
207+
)
208+
209+
def get_runtime(self, connection: "openeo.Connection") -> str:
210+
return self._runtime or self._guess_runtime(connection=connection)
211+
212+
@classmethod
213+
def from_file(
214+
cls,
215+
path: Union[str, pathlib.Path],
216+
runtime: Optional[str] = None,
217+
version: Optional[str] = None,
218+
context: Optional[dict] = None,
219+
) -> "UDF":
220+
"""
221+
Load a UDF from a local file.
222+
223+
.. seealso::
224+
:py:meth:`from_url` for loading from a URL.
225+
226+
:param path: path to the local file with UDF source code
227+
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
228+
:param version: optional UDF runtime version string
229+
:param context: optional additional UDF context data
230+
"""
231+
path = pathlib.Path(path)
232+
code = path.read_text(encoding="utf-8")
233+
return cls(
234+
code=code, runtime=runtime, version=version, context=context, _source=path
235+
)
236+
237+
@classmethod
238+
def from_url(
239+
cls,
240+
url: str,
241+
runtime: Optional[str] = None,
242+
version: Optional[str] = None,
243+
context: Optional[dict] = None,
244+
) -> "UDF":
245+
"""
246+
Load a UDF from a URL.
247+
248+
.. seealso::
249+
:py:meth:`from_file` for loading from a local file.
250+
251+
:param url: URL path to load the UDF source code from
252+
:param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted.
253+
:param version: optional UDF runtime version string
254+
:param context: optional additional UDF context data
255+
"""
256+
resp = requests.get(url)
257+
resp.raise_for_status()
258+
code = resp.text
259+
return cls(
260+
code=code, runtime=runtime, version=version, context=context, _source=url
261+
)
262+
263+
def _guess_runtime(self, connection: "openeo.Connection") -> str:
264+
"""Guess UDF runtime from UDF source (path) or source code."""
265+
# First, guess UDF language
266+
language = None
267+
if isinstance(self._source, pathlib.Path):
268+
language = self._guess_runtime_from_suffix(self._source.suffix)
269+
elif isinstance(self._source, str):
270+
url_match = re.match(
271+
r"https?://.*?(?P<suffix>\.\w+)([&#].*)?$", self._source
272+
)
273+
if url_match:
274+
language = self._guess_runtime_from_suffix(url_match.group("suffix"))
275+
if not language:
276+
# Guess language from UDF code
277+
if re.search(r"^def [\w0-9_]+\(", self.code, flags=re.MULTILINE):
278+
language = "Python"
279+
# TODO: detection heuristics for R and other languages?
280+
if not language:
281+
raise OpenEoClientException("Failed to detect language of UDF code.")
282+
# Find runtime for language
283+
runtimes = {k.lower(): k for k in connection.list_udf_runtimes().keys()}
284+
if language.lower() in runtimes:
285+
return runtimes[language.lower()]
286+
else:
287+
raise OpenEoClientException(
288+
f"Failed to match UDF language {language!r} with a runtime ({runtimes})"
289+
)
290+
291+
def _guess_runtime_from_suffix(self, suffix: str) -> Union[str]:
292+
return {
293+
".py": "Python",
294+
".r": "R",
295+
}.get(suffix.lower())
296+
297+
def get_run_udf_callback(
298+
self, connection: "openeo.Connection", data_parameter: str = "data"
299+
) -> PGNode:
300+
"""
301+
For internal use: construct `run_udf` node to be used as callback in `apply`, `reduce_dimension`, ...
302+
"""
303+
arguments = dict_no_none(
304+
data={"from_parameter": data_parameter},
305+
udf=self.code,
306+
runtime=self.get_runtime(connection=connection),
307+
version=self.version,
308+
context=self.context,
309+
)
310+
return PGNode(process_id="run_udf", arguments=arguments)

0 commit comments

Comments
 (0)