Skip to content

Commit

Permalink
Merge pull request #5389 from chu11/issue5327_python_kvs_modernize_kv…
Browse files Browse the repository at this point in the history
…swatch

python: support interface to perform KVS watch
  • Loading branch information
mergify[bot] authored Jan 30, 2024
2 parents cb6bc7b + 4a58554 commit 556d42c
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 52 deletions.
67 changes: 21 additions & 46 deletions src/bindings/python/flux/job/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import json

from _flux._core import ffi
from flux.future import Future
from flux.job._wrapper import _RAW as RAW
from flux.kvs import WatchImplementation

# Names of events that may appear in the main eventlog (i.e. ``eventlog="eventlog"``)
# See Flux RFC 21 for documentation on each event.
Expand Down Expand Up @@ -74,23 +74,32 @@ def context_string(self):
)


class JobEventWatchFuture(Future):
class JobEventWatchFuture(WatchImplementation):
"""
A future returned from job.event_watch_async().
Adds get_event() method to return an EventLogEntry event
"""

def __del__(self):
if self.needs_cancel is not False:
self.cancel()
try:
super().__del__()
except AttributeError:
pass

def __init__(self, future_handle):
super().__init__(future_handle)
self.needs_cancel = True

def watch_get(self, future):
"""
Implementation of watch_get() for JobEventWatchFuture.
Will be called from WatchABC.get()
"""
result = ffi.new("char *[1]")
RAW.event_watch_get(future, result)
return EventLogEvent(ffi.string(result[0]).decode("utf-8"))

def watch_cancel(self, future):
"""
Implementation of watch_cancel() for JobEventWatchFuture.
Will be called from WatchABC.cancel()
"""
RAW.event_watch_cancel(future)

def get_event(self, autoreset=True):
"""
Expand All @@ -101,41 +110,7 @@ def get_event(self, autoreset=True):
call to get_event() will try to fetch the next event and thus
may block.
"""
result = ffi.new("char *[1]")
try:
# Block until Future is ready:
self.wait_for()
RAW.event_watch_get(self.pimpl, result)
except OSError as exc:
if exc.errno == errno.ENODATA:
self.needs_cancel = False
return None
# raise handle exception if there is one
self.raise_if_handle_exception()
# re-raise all other exceptions
#
# Note: overwrite generic OSError strerror string with the
# EventWatch future error string to give the caller appropriate
# detail (e.g. instead of "No such file or directory" use
# "job <jobid> does not exist"
#
exc.strerror = self.error_string()
raise
event = EventLogEvent(ffi.string(result[0]).decode("utf-8"))
if autoreset is True:
self.reset()
return event

def cancel(self, stop=False):
"""Cancel a streaming job.event_watch_async() future
If stop=True, then deactivate the multi-response future so no
further callbacks are called.
"""
RAW.event_watch_cancel(self.pimpl)
self.needs_cancel = False
if stop:
self.stop()
return self.get(autoreset=autoreset)


def event_watch_async(flux_handle, jobid, eventlog="eventlog"):
Expand Down
154 changes: 148 additions & 6 deletions src/bindings/python/flux/kvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import errno
import json
import os
from abc import ABC, abstractmethod
from typing import Any, Mapping

import flux.constants
from _flux._core import ffi, lib
from flux.future import Future
from flux.rpc import RPC
Expand All @@ -31,19 +33,24 @@ class KVSWrapper(Wrapper):
RAW.flux_kvsitr_next.set_error_check(lambda x: False)


def _get_value(valp):
try:
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
except json.decoder.JSONDecodeError:
ret = ffi.string(valp[0]).decode("utf-8")
except UnicodeDecodeError:
ret = ffi.string(valp[0])
return ret


def get_key_direct(flux_handle, key, namespace=None):
valp = ffi.new("char *[1]")
future = RAW.flux_kvs_lookup(flux_handle, namespace, 0, key)
RAW.flux_kvs_lookup_get(future, valp)
if valp[0] == ffi.NULL:
return None

try:
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
except json.decoder.JSONDecodeError:
ret = ffi.string(valp[0]).decode("utf-8")
except UnicodeDecodeError:
ret = ffi.string(valp[0])
ret = _get_value(valp)
RAW.flux_future_destroy(future)
return ret

Expand Down Expand Up @@ -773,3 +780,138 @@ def walk(directory, topdown=False, flux_handle=None, namespace=None):
raise ValueError("If directory is a key, flux_handle must be specified")
directory = KVSDir(flux_handle, directory, namespace=namespace)
return _inner_walk(directory, "", topdown, namespace=namespace)


class WatchImplementation(Future, ABC):
"""
Interface for KVS based watchers
Users to implement watch_get() and watch_cancel() functions.
"""

def __del__(self):
if self.needs_cancel is not False:
self.cancel()
try:
super().__del__()
except AttributeError:
pass

def __init__(self, future_handle):
super().__init__(future_handle)
self.needs_cancel = True

@abstractmethod
def watch_get(self, future):
pass

@abstractmethod
def watch_cancel(self, future):
pass

def get(self, autoreset=True):
"""
Return the new value or None if the stream has terminated.
The future is auto-reset unless autoreset=False, so a subsequent
call to get() will try to fetch the next value and thus
may block.
"""
try:
# Block until Future is ready:
self.wait_for()
ret = self.watch_get(self.pimpl)
except OSError as exc:
if exc.errno == errno.ENODATA:
self.needs_cancel = False
return None
# raise handle exception if there is one
self.raise_if_handle_exception()
# re-raise all other exceptions
#
# Note: overwrite generic OSError strerror string with the
# EventWatch future error string to give the caller appropriate
# detail (e.g. instead of "No such file or directory" use
# "job <jobid> does not exist"
#
exc.strerror = self.error_string()
raise
if autoreset is True:
self.reset()
return ret

def cancel(self, stop=False):
"""Cancel a streaming future
If stop=True, then deactivate the multi-response future so no
further callbacks are called.
"""
self.watch_cancel(self.pimpl)
self.needs_cancel = False
if stop:
self.stop()


class KVSWatchFuture(WatchImplementation):
"""
A future returned from kvs_watch_async().
"""

def __init__(self, future_handle):
super().__init__(future_handle)

def watch_get(self, future):
"""
Implementation of watch_get() for KVSWatchFuture.
Will be called from WatchABC.get()
"""
valp = ffi.new("char *[1]")
RAW.flux_kvs_lookup_get(future, valp)
return _get_value(valp)

def watch_cancel(self, future):
"""
Implementation of watch_cancel() for KVSWatchFuture.
Will be called from WatchABC.cancel()
"""
RAW.flux_kvs_lookup_cancel(future)


def kvs_watch_async(
flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False
):
"""Asynchronously get KVS updates for a key
Args:
flux_handle: A Flux handle obtained from flux.Flux()
key: the key on which to watch
namespace: namespace to read from, defaults to None. If namespace
is None, the namespace specified in the FLUX_KVS_NAMESPACE
environment variable will be used. If FLUX_KVS_NAMESPACE is not
set, the primary namespace will be used.
waitcreate: If True and a key does not yet exist, will wait
for it to exit. Defaults to False.
uniq: If True, only different values will be returned by
watch. Defaults to False.
full: If True, any change that can affect the key is
monitored. Typically, this is to capture when a parent directory
is removed or altered in some way. Typically kvs watch will not
detect this as the exact key has not been changed. Defaults to
False.
Returns:
KVSWatchFuture: a KVSWatchFuture object. Call .get() from the then
callback to get the currently returned value from the Future object.
"""

flags = flux.constants.FLUX_KVS_WATCH
if waitcreate:
flags |= flux.constants.FLUX_KVS_WAITCREATE
if uniq:
flags |= flux.constants.FLUX_KVS_WATCH_UNIQ
if full:
flags |= flux.constants.FLUX_KVS_WATCH_FULL
future = RAW.flux_kvs_lookup(flux_handle, namespace, flags, key)
return KVSWatchFuture(future)
Loading

0 comments on commit 556d42c

Please sign in to comment.