Skip to content

Commit

Permalink
Add reverse, snapshot, snapshot_when to the intf
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Nov 12, 2024
1 parent f16f676 commit 1acd511
Showing 1 changed file with 89 additions and 135 deletions.
224 changes: 89 additions & 135 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,95 @@ def tail(self: T, num_rows: int) -> T:
except Exception as e:
raise DHError(e, f"tail operation on the {self.__class__.__name__} failed.") from e

def reverse(self: T) -> T:
"""When called on a :class:`Table`, the reverse method creates a new table with all of the rows from this table
in reverse order.
When called on a :class:`PartitionedTableProxy`, the reverse method applies the reverse operation to all constituent
tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result tables
as the constituents of its underlying partitioned table.
Returns:
a new :class:`Table` or :class:`PartitionedTableProxy`
Raises:
DHError
"""
try:
with auto_locking_ctx(self):
return self.__class__(self.j_object.reverse())
except Exception as e:
raise DHError(e, f"reverse operation on the {self.__class__.__name__} failed.") from e


def snapshot(self: T) -> T:
"""When called on a :class:`Table`, the snapshot method returns a static snapshot table.
When called on a :class:`PartitionedTableProxy`, the snapshot method applies the snapshot operation to all constituent
tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result tables
as the constituents of its underlying partitioned table.
Returns:
a new :class:`Table` or :class:`PartitionedTableProxy`
Raises:
DHError
"""
try:
with auto_locking_ctx(self):
return self.__class__(self.j_object.snapshot())
except Exception as e:
raise DHError(e, "snapshot operation on the {self.__class__.__name__} failed.") from e

def snapshot_when(self: T, trigger_table: Union[Table, PartitionedTableProxy],
stamp_cols: Union[str, List[str]] = None, initial: bool = False, incremental: bool = False,
history: bool = False) -> T:
"""When called on a :class:`Table`, the snapshot_when method returns a table that captures a snapshot of this
table whenever trigger_table updates. When trigger_table updates, a snapshot of this table and the "stamp key"
from trigger_table form the resulting table. The "stamp key" is the last row of the trigger_table, limited by
the stamp_cols. If trigger_table is empty, the "stamp key" will be represented by NULL values.
Note: the trigger_table must be append-only when the history flag is set to True. If the trigger_table is not
append-only and has modified or removed rows in its updates, the result snapshot table will be put in a failure
state and become unusable.
When called on a :class:`PartitionedTableProxy`, the snapshot_when method applies the snapshot_when operation to
all constituent tables of the underlying partitioned table with the provided trigger :class:`Table or
:class:`.PartitionedTableProxy`, and produces a new :class:`PartitionedTableProxy` with the result tables as the
constituents of its underlying partitioned table. In the case of the trigger table being another :class:`PartitionedTableProxy`,
the snapshot_when operation is applied to the matching pairs of the constituent tables from both underlying
partitioned tables.
Args:
trigger_table (Union[Table, PartitionedTableProxy]): the trigger Table or PartitionedTableProxy which is only
allowed when called on a PartitionedTableProxy.
stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be
renames. None, or empty, means that all columns from trigger_table form the "stamp key".
initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the
resulting table will remain empty until trigger_table first updates.
incremental (bool): Whether the resulting table should be incremental, default is False. When False, all
rows of this table will have the latest "stamp key". When True, only the rows of this table that have
been added or updated will have the latest "stamp key".
history (bool): Whether the resulting table should keep history, default is False. A history table appends a
full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag
is currently incompatible with initial and incremental: when history is True, incremental and initial
must be False.
Returns
a new :class:`Table` or :class:`PartitionedTableProxy`
Raises:
DHError
"""
try:
if isinstance(self, Table) and isinstance(trigger_table, PartitionedTableProxy):
raise ValueError("snapshot_when operation on a Table with a PartitionedTableProxy trigger_table is not supported.")

options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols))
with auto_locking_ctx(self, trigger_table):
return self.__class__(self.j_object.snapshotWhen(trigger_table.j_object, options))
except Exception as e:
raise DHError(e, f"snapshot_when operation on the PartitionedTableProxy{self.__class__.__name__} failed.") from e


class Table(JObjectWrapper, TableOperations["Table"]):
"""A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations.
Expand Down Expand Up @@ -874,59 +963,6 @@ def remove_blink(self) -> Table:
"""Returns a non-blink child table, or this table if it is not a blink table."""
return Table(j_table=self.j_table.removeBlink())

def snapshot(self) -> Table:
"""Returns a static snapshot table.
Returns:
a new table
Raises:
DHError
"""
try:
with auto_locking_ctx(self):
return Table(j_table=self.j_table.snapshot())
except Exception as e:
raise DHError(message="failed to create a snapshot.") from e

def snapshot_when(self, trigger_table: Table, stamp_cols: Union[str, List[str]] = None, initial: bool = False,
incremental: bool = False, history: bool = False) -> Table:
"""Returns a table that captures a snapshot of this table whenever trigger_table updates.
When trigger_table updates, a snapshot of this table and the "stamp key" from trigger_table form the resulting
table. The "stamp key" is the last row of the trigger_table, limited by the stamp_cols. If trigger_table is
empty, the "stamp key" will be represented by NULL values.
Note: the trigger_table must be append-only when the history flag is set to True. If the trigger_table is not
append-only and has modified or removed rows in its updates, the result snapshot table will be put in a failure
state and become unusable.
Args:
trigger_table (Table): the trigger table
stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be
renames. None, or empty, means that all columns from trigger_table form the "stamp key".
initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the
resulting table will remain empty until trigger_table first updates.
incremental (bool): Whether the resulting table should be incremental, default is False. When False, all
rows of this table will have the latest "stamp key". When True, only the rows of this table that have
been added or updated will have the latest "stamp key".
history (bool): Whether the resulting table should keep history, default is False. A history table appends a
full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag
is currently incompatible with initial and incremental: when history is True, incremental and initial
must be False.
Returns:
a new table
Raises:
DHError
"""
try:
options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols))
with auto_locking_ctx(self, trigger_table):
return Table(j_table=self.j_table.snapshotWhen(trigger_table.j_table, options))
except Exception as e:
raise DHError(message="failed to create a snapshot_when table.") from e

#
# Table operation category: Select
Expand Down Expand Up @@ -1322,20 +1358,6 @@ def sort_descending(self, order_by: Union[str, Sequence[str]]) -> Table:
except Exception as e:
raise DHError(e, "table sort_descending operation failed.") from e

def reverse(self) -> Table:
"""The reverse method creates a new table with all of the rows from this table in reverse order.
Returns:
a new table
Raises:
DHError
"""
try:
return Table(j_table=self.j_table.reverse())
except Exception as e:
raise DHError(e, "table reverse operation failed.") from e

def sort(self, order_by: Union[str, Sequence[str]],
order: Union[SortDirection, Sequence[SortDirection]] = None) -> Table:
"""The sort method creates a new table where the rows are ordered based on values in a specified set of columns.
Expand Down Expand Up @@ -2865,75 +2887,7 @@ def __init__(self, j_pt_proxy):
self.sanity_check_joins = self.j_pt_proxy.sanityChecksJoins()
self.target = PartitionedTable(j_partitioned_table=self.j_pt_proxy.target())

def reverse(self) -> PartitionedTableProxy:
"""Applies the :meth:`~Table.reverse` table operation to all constituent tables of the underlying partitioned
table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying
partitioned table.
Returns:
a new PartitionedTableProxy
Raises:
DHError
"""
try:
with auto_locking_ctx(self):
return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.reverse())
except Exception as e:
raise DHError(e, "reverse operation on the PartitionedTableProxy failed.") from e

def snapshot(self) -> PartitionedTableProxy:
"""Applies the :meth:`~Table.snapshot` table operation to all constituent tables of the underlying partitioned
table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying
partitioned table.
Returns:
a new PartitionedTableProxy
Raises:
DHError
"""
try:
with auto_locking_ctx(self):
return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.snapshot())
except Exception as e:
raise DHError(e, "snapshot operation on the PartitionedTableProxy failed.") from e

def snapshot_when(self, trigger_table: Union[Table, PartitionedTableProxy],
stamp_cols: Union[str, List[str]] = None, initial: bool = False, incremental: bool = False,
history: bool = False) -> PartitionedTableProxy:
"""Applies the :meth:`~Table.snapshot_when` table operation to all constituent tables of the underlying
partitioned table with the provided trigger table or PartitionedTableProxy, and produces a new
PartitionedTableProxy with the result tables as the constituents of its underlying partitioned table.

In the case of the trigger table being another PartitionedTableProxy, the :meth:`~Table.snapshot_when` table
operation is applied to the matching pairs of the constituent tables from both underlying partitioned tables.
Args:
trigger_table (Union[Table, PartitionedTableProxy]): the trigger Table or PartitionedTableProxy
stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be
renames. None, or empty, means that all columns from trigger_table form the "stamp key".
initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the
resulting table will remain empty until trigger_table first updates.
incremental (bool): Whether the resulting table should be incremental, default is False. When False, all
rows of this table will have the latest "stamp key". When True, only the rows of this table that have
been added or updated will have the latest "stamp key".
history (bool): Whether the resulting table should keep history, default is False. A history table appends a
full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag
is currently incompatible with initial and incremental: when history is True, incremental and initial
must be False.
Returns:
a new PartitionedTableProxy
Raises:
DHError
"""
try:
options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols))
with auto_locking_ctx(self, trigger_table):
return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.snapshotWhen(trigger_table.j_object, options))
except Exception as e:
raise DHError(e, "snapshot_when operation on the PartitionedTableProxy failed.") from e

def sort(self, order_by: Union[str, Sequence[str]],
order: Union[SortDirection, Sequence[SortDirection]] = None) -> PartitionedTableProxy:
Expand Down

0 comments on commit 1acd511

Please sign in to comment.