Skip to content

Commit

Permalink
Adding support for triggered functions (TFUNCTION) (#2861)
Browse files Browse the repository at this point in the history
Co-authored-by: Chayim I. Kirshen <[email protected]>
Co-authored-by: dvora-h <[email protected]>
Co-authored-by: Chayim <[email protected]>
  • Loading branch information
4 people authored Aug 8, 2023
1 parent b0abd55 commit d5c2d1d
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 0 deletions.
1 change: 1 addition & 0 deletions dockers/cluster.redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ loadmodule /opt/redis-stack/lib/redisgraph.so
loadmodule /opt/redis-stack/lib/redistimeseries.so
loadmodule /opt/redis-stack/lib/rejson.so
loadmodule /opt/redis-stack/lib/redisbloom.so
loadmodule /opt/redis-stack/lib/redisgears.so v8-plugin-path /opt/redis-stack/lib/libredisgears_v8_plugin.so
6 changes: 6 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ class AbstractRedisCluster:
"READONLY",
"READWRITE",
"TIME",
"TFUNCTION LOAD",
"TFUNCTION DELETE",
"TFUNCTION LIST",
"TFCALL",
"TFCALLASYNC",
"GRAPH.CONFIG",
"LATENCY HISTORY",
"LATENCY LATEST",
Expand All @@ -298,6 +303,7 @@ class AbstractRedisCluster:
"FUNCTION LIST",
"FUNCTION LOAD",
"FUNCTION RESTORE",
"REDISGEARS_2.REFRESHCLUSTER",
"SCAN",
"SCRIPT EXISTS",
"SCRIPT FLUSH",
Expand Down
10 changes: 10 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
AsyncACLCommands,
AsyncDataAccessCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
AsyncManagementCommands,
AsyncScriptCommands,
DataAccessCommands,
FunctionCommands,
GearsCommands,
ManagementCommands,
PubSubCommands,
ResponseT,
Expand Down Expand Up @@ -689,6 +691,12 @@ def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
self.read_from_replicas = False
return self.execute_command("READWRITE", target_nodes=target_nodes)

def gears_refresh_cluster(self, **kwargs) -> ResponseT:
"""
On an OSS cluster, before executing any gears function, you must call this command. # noqa
"""
return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs)


class AsyncClusterManagementCommands(
ClusterManagementCommands, AsyncManagementCommands
Expand Down Expand Up @@ -864,6 +872,7 @@ class RedisClusterCommands(
ClusterDataAccessCommands,
ScriptCommands,
FunctionCommands,
GearsCommands,
RedisModuleCommands,
):
"""
Expand Down Expand Up @@ -893,6 +902,7 @@ class AsyncRedisClusterCommands(
AsyncClusterDataAccessCommands,
AsyncScriptCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
):
"""
A class for all Redis Cluster commands
Expand Down
127 changes: 127 additions & 0 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6105,6 +6105,131 @@ def function_stats(self) -> Union[Awaitable[List], List]:
AsyncFunctionCommands = FunctionCommands


class GearsCommands:
def tfunction_load(
self, lib_code: str, replace: bool = False, config: Union[str, None] = None
) -> ResponseT:
"""
Load a new library to RedisGears.
``lib_code`` - the library code.
``config`` - a string representation of a JSON object
that will be provided to the library on load time,
for more information refer to
https://github.com/RedisGears/RedisGears/blob/master/docs/function_advance_topics.md#library-configuration
``replace`` - an optional argument, instructs RedisGears to replace the
function if its already exists
For more information see https://redis.io/commands/tfunction-load/
"""
pieces = []
if replace:
pieces.append("REPLACE")
if config is not None:
pieces.extend(["CONFIG", config])
pieces.append(lib_code)
return self.execute_command("TFUNCTION LOAD", *pieces)

def tfunction_delete(self, lib_name: str) -> ResponseT:
"""
Delete a library from RedisGears.
``lib_name`` the library name to delete.
For more information see https://redis.io/commands/tfunction-delete/
"""
return self.execute_command("TFUNCTION DELETE", lib_name)

def tfunction_list(
self,
with_code: bool = False,
verbose: int = 0,
lib_name: Union[str, None] = None,
) -> ResponseT:
"""
List the functions with additional information about each function.
``with_code`` Show libraries code.
``verbose`` output verbosity level, higher number will increase verbosity level
``lib_name`` specifying a library name (can be used multiple times to show multiple libraries in a single command) # noqa
For more information see https://redis.io/commands/tfunction-list/
"""
pieces = []
if with_code:
pieces.append("WITHCODE")
if verbose >= 1 and verbose <= 3:
pieces.append("v" * verbose)
else:
raise DataError("verbose can be 1, 2 or 3")
if lib_name is not None:
pieces.append("LIBRARY")
pieces.append(lib_name)

return self.execute_command("TFUNCTION LIST", *pieces)

def _tfcall(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
_async: bool = False,
*args: List,
) -> ResponseT:
pieces = [f"{lib_name}.{func_name}"]
if keys is not None:
pieces.append(len(keys))
pieces.extend(keys)
else:
pieces.append(0)
if args is not None:
pieces.extend(args)
if _async:
return self.execute_command("TFCALLASYNC", *pieces)
return self.execute_command("TFCALL", *pieces)

def tfcall(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
*args: List,
) -> ResponseT:
"""
Invoke a function.
``lib_name`` - the library name contains the function.
``func_name`` - the function name to run.
``keys`` - the keys that will be touched by the function.
``args`` - Additional argument to pass to the function.
For more information see https://redis.io/commands/tfcall/
"""
return self._tfcall(lib_name, func_name, keys, False, *args)

def tfcall_async(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
*args: List,
) -> ResponseT:
"""
Invoke an async function (coroutine).
``lib_name`` - the library name contains the function.
``func_name`` - the function name to run.
``keys`` - the keys that will be touched by the function.
``args`` - Additional argument to pass to the function.
For more information see https://redis.io/commands/tfcall/
"""
return self._tfcall(lib_name, func_name, keys, True, *args)


AsyncGearsCommands = GearsCommands


class DataAccessCommands(
BasicKeyCommands,
HyperlogCommands,
Expand Down Expand Up @@ -6148,6 +6273,7 @@ class CoreCommands(
PubSubCommands,
ScriptCommands,
FunctionCommands,
GearsCommands,
):
"""
A class containing all of the implemented redis commands. This class is
Expand All @@ -6164,6 +6290,7 @@ class AsyncCoreCommands(
AsyncPubSubCommands,
AsyncScriptCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
):
"""
A class containing all of the implemented redis commands. This class is
Expand Down
51 changes: 51 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,57 @@ def teardown():
assert "client-info" in r.acl_log(count=1, target_nodes=node)[0]
assert r.acl_log_reset(target_nodes=node)

def generate_lib_code(self, lib_name):
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa

def try_delete_libs(self, r, *lib_names):
for lib_name in lib_names:
try:
r.tfunction_delete(lib_name)
except Exception:
pass

@skip_if_server_version_lt("7.1.140")
def test_tfunction_load_delete(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1")
lib_code = self.generate_lib_code("lib1")
assert r.tfunction_load(lib_code)
assert r.tfunction_delete("lib1")

@skip_if_server_version_lt("7.1.140")
def test_tfunction_list(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1", "lib2", "lib3")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfunction_load(self.generate_lib_code("lib2"))
assert r.tfunction_load(self.generate_lib_code("lib3"))

# test error thrown when verbose > 4
with pytest.raises(DataError):
assert r.tfunction_list(verbose=8)

functions = r.tfunction_list(verbose=1)
assert len(functions) == 3

expected_names = [b"lib1", b"lib2", b"lib3"]
actual_names = [functions[0][13], functions[1][13], functions[2][13]]

assert sorted(expected_names) == sorted(actual_names)
assert r.tfunction_delete("lib1")
assert r.tfunction_delete("lib2")
assert r.tfunction_delete("lib3")

@skip_if_server_version_lt("7.1.140")
def test_tfcall(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfcall("lib1", "foo") == b"bar"
assert r.tfcall_async("lib1", "foo") == b"bar"

assert r.tfunction_delete("lib1")


@pytest.mark.onlycluster
class TestNodesManager:
Expand Down
51 changes: 51 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,57 @@ def test_substr(self, r):
assert r.substr("a", 3, 5) == b"345"
assert r.substr("a", 3, -2) == b"345678"

def generate_lib_code(self, lib_name):
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa

def try_delete_libs(self, r, *lib_names):
for lib_name in lib_names:
try:
r.tfunction_delete(lib_name)
except Exception:
pass

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfunction_load_delete(self, r):
self.try_delete_libs(r, "lib1")
lib_code = self.generate_lib_code("lib1")
assert r.tfunction_load(lib_code)
assert r.tfunction_delete("lib1")

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfunction_list(self, r):
self.try_delete_libs(r, "lib1", "lib2", "lib3")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfunction_load(self.generate_lib_code("lib2"))
assert r.tfunction_load(self.generate_lib_code("lib3"))

# test error thrown when verbose > 4
with pytest.raises(redis.exceptions.DataError):
assert r.tfunction_list(verbose=8)

functions = r.tfunction_list(verbose=1)
assert len(functions) == 3

expected_names = [b"lib1", b"lib2", b"lib3"]
actual_names = [functions[0][13], functions[1][13], functions[2][13]]

assert sorted(expected_names) == sorted(actual_names)
assert r.tfunction_delete("lib1")
assert r.tfunction_delete("lib2")
assert r.tfunction_delete("lib3")

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfcall(self, r):
self.try_delete_libs(r, "lib1")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfcall("lib1", "foo") == b"bar"
assert r.tfcall_async("lib1", "foo") == b"bar"

assert r.tfunction_delete("lib1")

def test_ttl(self, r):
r["a"] = "1"
assert r.expire("a", 10)
Expand Down

0 comments on commit d5c2d1d

Please sign in to comment.