Skip to content

Commit

Permalink
Added some comments and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsimb committed Dec 20, 2024
1 parent 0e2a995 commit f8dd089
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 12 deletions.
5 changes: 5 additions & 0 deletions exasol/analytics/query_handler/graph/execution_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@


class ExecutionGraph(Generic[T]):
"""
The class is a generic implementation of a directed acyclic graph, with nodes
of type T.
The graph provides a node traverse order.
"""

def __init__(self, start_node: T, end_node: T, edges: Set[Tuple[T, T]]):
self._graph = nx.DiGraph()
Expand Down
3 changes: 3 additions & 0 deletions exasol/analytics/query_handler/graph/stage/sql/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Dataset:
The TableLike refers to a database table containing the actual data that
can be used for instance in training or testing.
Q. A TableLike is basically a list of columns and a name. Why do we have
a separate list of columns here?
"""
table_like: TableLike
identifier_columns: List[Column]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

@dataclasses.dataclass(frozen=True, eq=True)
class SQLStageGraphExecutionInput:
"""
The class is an input for the graph query handler (not to be confused with
a node query handler provided by a user).
It includes the input data for the root node of the graph, the place in the BucketFS
where the result should be stored and the graph itself.
"""
input: SQLStageInputOutput
result_bucketfs_location: bfs.path.PathLike
sql_stage_graph: SQLStageGraph
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
class SQLStageGraphExecutionQueryHandler(
QueryHandler[SQLStageGraphExecutionInput, SQLStageInputOutput]
):
"""
Implementation of the execution graph QueryHandler. Each node of the graph contains
its own, user provided, query handler.
"""

def __init__(
self,
parameter: SQLStageGraphExecutionInput,
Expand All @@ -34,12 +39,20 @@ def __init__(
self._state = query_handler_state_factory(parameter, query_handler_context)

def start(self) -> Union[Continue, Finish[SQLStageInputOutput]]:
# Start traversing the graph from the root node.
result = self._run_until_continue_or_last_stage_finished()
return result

def handle_query_result(
self, query_result: QueryResult
) -> Union[Continue, Finish[SQLStageInputOutput]]:
"""
Here we call the handle_query_result() of the query handler at the current node.
If this handler returns a Finish object then we proceed to the next node, if
there is any, and start traversing the graph from there (see
_run_until_continue_or_last_stage_finished()). Otherwise, if the returned object
is Continue, we stay on the same node and return the result.
"""
result = self._state.get_current_query_handler().handle_query_result(
query_result
)
Expand All @@ -57,6 +70,13 @@ def handle_query_result(
def _run_until_continue_or_last_stage_finished(
self,
) -> Union[Continue, Finish[SQLStageInputOutput]]:
"""
We start traversing the graph from the current node by calling the start() on
its query handler. If the handler returns a Continue object we will exit the
loop and return the result. Otherwise, if the handler returns a Finish object,
we will move to the next node and call the start() on its query handler. We will
carry on until a Continue object is returned or the whole graph is traversed.
"""
while True:
handler = self._state.get_current_query_handler()
result = handler.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
)
from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage
from exasol.analytics.query_handler.graph.stage.sql.sql_stage_query_handler import (
SQLStageQueryHandlerInput,
SQLStageQueryHandlerInput, SQLStageQueryHandler
)
from exasol.analytics.query_handler.query_handler import QueryHandler
from exasol.analytics.query_handler.result import Continue, Finish
from exasol.analytics.utils.errors import UninitializedAttributeError

Expand Down Expand Up @@ -53,15 +52,13 @@ def __init__(
self._current_stage_index
]
self._stage_inputs_map[self._current_stage].append(parameter.input)
self._current_query_handler: Optional[
QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]
] = None
self._current_query_handler: Optional[SQLStageQueryHandler] = None
self._current_qh_context: Optional[ScopeQueryHandlerContext] = None
self._create_current_query_handler()

def get_current_query_handler(
self,
) -> QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]:
) -> SQLStageQueryHandler:
value = self._current_query_handler
if value is None:
raise RuntimeError("No current query handler set.")
Expand All @@ -86,6 +83,16 @@ def _checked_current_stage(self) -> SQLStage:
def handle_result(
self, result: Union[Continue, Finish[SQLStageInputOutput]]
) -> ResultHandlerReturnValue:
"""
Here we look at the provided last result obtained at the current node.
If it is encapsulated in a Finish object we then either save the result so that
it can be used as an input at the next node, or take it as the final output in
case this node is the last one. In the former case we move to the next node and
return CONTINUE_PROCESSING. In the latter case we return RETURN_RESULT.
If the result is encapsulated in a Continue object we return RETURN_RESULT.
"""
# check if current query handler is set
self.get_current_query_handler()
if isinstance(result, Finish):
Expand All @@ -98,6 +105,11 @@ def handle_result(
def _handle_finished_result(
self, result: Finish[SQLStageInputOutput]
) -> ResultHandlerReturnValue:
"""
We are done with the current node.
If there are successors we need to add the final output from this node to the
list of inputs of all the successors.
"""
if self._is_not_last_stage():
self._add_result_to_successors(result.result)
else:
Expand Down
15 changes: 10 additions & 5 deletions exasol/analytics/query_handler/graph/stage/sql/input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
@dataclass(frozen=True)
class SQLStageInputOutput:
"""
A SQLStageInputOutput is used as input and output between the SQLStageQueryHandler.
It contains a dataset and dependencies. The dataset is used to represent train and test data.
The dependencies can be used to communicate any data to the subsequently stages.
For example, a dependency could be a table which the previous stage computed and
the subsequent one uses.
This is a type root for a class representing an input/output data for a customer
provided node-level query handler - SQLStageQueryHandler. The actual content of
the input/output is application specific.
"""
pass


@dataclass(frozen=True)
class MultiDatasetSQLStageInputOutput(SQLStageInputOutput):
"""
An implementation of SQLStageInputOutput holding a collection of datasets and
dependencies. Can be used, for example, for model training. The datasets may be
used to represent train and test data. The dependencies can be used to communicate
any data to the subsequently stages. For example, a dependency could be a table
which the previous stage computed and the subsequent one uses.
"""
datasets: Dict[object, Dataset]
dependencies: Dependencies = field(default_factory=dict)
6 changes: 6 additions & 0 deletions exasol/analytics/query_handler/graph/stage/sql/sql_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@


class SQLStage(Stage):
"""
This is a node of an ExecutionGraph.
Essentially, this is a node-level query handler factory. The query handler
itself is user-provided and so is this factory.
"""

@abc.abstractmethod
def create_query_handler(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage

SQLStageGraph = ExecutionGraph[SQLStage]
"""
A directed acyclic graph with nodes of type SQLStage.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ def is_empty(obj: Sized):

@dataclasses.dataclass(eq=True)
class SQLStageQueryHandlerInput:
"""
A Parameter type for the SQLStageQueryHandler.
Contains a list of Input/Output objects and a location in the BucketFS for
persisting the data if needed.
Why is the list? Because the input data for a node can come from multiple
predecessors.
"""
sql_stage_inputs: List[SQLStageInputOutput]
result_bucketfs_location: AbstractBucketFSLocation

def __post_init__(self):
if is_empty(self.sql_stage_inputs):
raise AssertionError("Empty sql_stage_inputs not allowed.")
raise AssertionError("Empty sql_stage_inputs are not allowed.")


class SQLStageQueryHandler(
QueryHandler[SQLStageQueryHandlerInput, SQLStageInputOutput], ABC
):
"""
The base class for a QueryHandler of a single node. The implementation shall be
provided by the user. See the UDFQueryHandler and the user_guide.md for details
and guidelines on how to build a query handler.
"""
pass

0 comments on commit f8dd089

Please sign in to comment.