Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added some comments and docstrings #242

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions exasol/analytics/query_handler/graph/execution_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@


class ExecutionGraph(Generic[T]):
"""
The class is a generic implementation of a directed acyclic graph, with nodes
of type T.
The graph provides a node traverse order.
"""

def __init__(self, start_node: T, end_node: T, edges: Set[Tuple[T, T]]):
self._graph = nx.DiGraph()
Expand Down
3 changes: 3 additions & 0 deletions exasol/analytics/query_handler/graph/stage/sql/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ class Dataset:

The TableLike refers to a database table containing the actual data that
can be used for instance in training or testing.

Q. A TableLike is basically a list of columns and a name. Why do we have
a separate list of columns here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK there was and is a separation of "ordinary" columns and "identifier columns" which may be handled specially in some cases? But the attribute columns could be superfluous in my understanding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I meant. The identifier_columns OK, I can understand. But just columns are inside the table_like and then also separately. Why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot answer your question. Could you ask @tkilias ?
I am happy to join a discussion so that I can understand the background.

"""
table_like: TableLike
identifier_columns: List[Column]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

@dataclasses.dataclass(frozen=True, eq=True)
class SQLStageGraphExecutionInput:
"""
The class is an input for the graph query handler (not to be confused with
a node query handler provided by a user).
ckunki marked this conversation as resolved.
Show resolved Hide resolved
It includes the input data for the root node of the graph, the place in the BucketFS
where the result should be stored and the graph itself.
"""
input: SQLStageInputOutput
result_bucketfs_location: bfs.path.PathLike
sql_stage_graph: SQLStageGraph
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
class SQLStageGraphExecutionQueryHandler(
QueryHandler[SQLStageGraphExecutionInput, SQLStageInputOutput]
):
"""
Implementation of the execution graph QueryHandler. Each node of the graph contains
its own, user provided, query handler.
"""

def __init__(
self,
parameter: SQLStageGraphExecutionInput,
Expand All @@ -34,12 +39,20 @@ def __init__(
self._state = query_handler_state_factory(parameter, query_handler_context)

def start(self) -> Union[Continue, Finish[SQLStageInputOutput]]:
# Start traversing the graph from the root node.
result = self._run_until_continue_or_last_stage_finished()
return result

def handle_query_result(
self, query_result: QueryResult
) -> Union[Continue, Finish[SQLStageInputOutput]]:
"""
Here we call the handle_query_result() of the query handler at the current node.
If this handler returns a Finish object then we proceed to the next node, if
there is any, and start traversing the graph from there (see
_run_until_continue_or_last_stage_finished()). Otherwise, if the returned object
is Continue, we stay on the same node and return the result.
"""
result = self._state.get_current_query_handler().handle_query_result(
query_result
)
Expand All @@ -57,6 +70,13 @@ def handle_query_result(
def _run_until_continue_or_last_stage_finished(
self,
) -> Union[Continue, Finish[SQLStageInputOutput]]:
"""
We start traversing the graph from the current node by calling the start() on
its query handler. If the handler returns a Continue object we will exit the
loop and return the result. Otherwise, if the handler returns a Finish object,
we will move to the next node and call the start() on its query handler. We will
carry on until a Continue object is returned or the whole graph is traversed.
"""
while True:
handler = self._state.get_current_query_handler()
result = handler.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
)
from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage
from exasol.analytics.query_handler.graph.stage.sql.sql_stage_query_handler import (
SQLStageQueryHandlerInput,
SQLStageQueryHandlerInput, SQLStageQueryHandler
)
from exasol.analytics.query_handler.query_handler import QueryHandler
from exasol.analytics.query_handler.result import Continue, Finish
from exasol.analytics.utils.errors import UninitializedAttributeError

Expand Down Expand Up @@ -53,15 +52,13 @@ def __init__(
self._current_stage_index
]
self._stage_inputs_map[self._current_stage].append(parameter.input)
self._current_query_handler: Optional[
QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]
] = None
self._current_query_handler: Optional[SQLStageQueryHandler] = None
self._current_qh_context: Optional[ScopeQueryHandlerContext] = None
self._create_current_query_handler()

def get_current_query_handler(
self,
) -> QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]:
) -> SQLStageQueryHandler:
value = self._current_query_handler
if value is None:
raise RuntimeError("No current query handler set.")
Expand All @@ -86,6 +83,16 @@ def _checked_current_stage(self) -> SQLStage:
def handle_result(
self, result: Union[Continue, Finish[SQLStageInputOutput]]
) -> ResultHandlerReturnValue:
"""
Here we look at the provided last result obtained at the current node.

If it is encapsulated in a Finish object we then either save the result so that
it can be used as an input at the next node, or take it as the final output in
case this node is the last one. In the former case we move to the next node and
return CONTINUE_PROCESSING. In the latter case we return RETURN_RESULT.

If the result is encapsulated in a Continue object we return RETURN_RESULT.
"""
# check if current query handler is set
self.get_current_query_handler()
if isinstance(result, Finish):
Expand All @@ -98,6 +105,11 @@ def handle_result(
def _handle_finished_result(
self, result: Finish[SQLStageInputOutput]
) -> ResultHandlerReturnValue:
"""
We are done with the current node.
If there are successors, then we need to add the final output from this node to the
list of inputs of all the successors.
"""
if self._is_not_last_stage():
self._add_result_to_successors(result.result)
else:
Expand Down
15 changes: 10 additions & 5 deletions exasol/analytics/query_handler/graph/stage/sql/input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
@dataclass(frozen=True)
class SQLStageInputOutput:
"""
A SQLStageInputOutput is used as input and output between the SQLStageQueryHandler.
It contains a dataset and dependencies. The dataset is used to represent train and test data.
The dependencies can be used to communicate any data to the subsequently stages.
For example, a dependency could be a table which the previous stage computed and
the subsequent one uses.
This is a type root for a class representing input/output data for a customer
provided node-level query handler extending class SQLStageQueryHandler. The actual content of
the input/output is application-specific.
"""
pass


@dataclass(frozen=True)
class MultiDatasetSQLStageInputOutput(SQLStageInputOutput):
"""
An implementation of SQLStageInputOutput holding a collection of datasets and
dependencies. Can be used, for example, for model training. The datasets may be
used to represent train and test data. The dependencies can be used to communicate
any data to the subsequently stages. For example, a dependency could be a table
which the previous stage computed and the subsequent one uses.
"""
datasets: Dict[object, Dataset]
dependencies: Dependencies = field(default_factory=dict)
6 changes: 6 additions & 0 deletions exasol/analytics/query_handler/graph/stage/sql/sql_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@


class SQLStage(Stage):
"""
This is a node of an ExecutionGraph.
Essentially, this is a node-level query handler factory. The query handler
itself is user-provided and so are the factories implementing this interface.
"""

@abc.abstractmethod
def create_query_handler(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage

SQLStageGraph = ExecutionGraph[SQLStage]
"""
A directed acyclic graph with nodes of type SQLStage.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ def is_empty(obj: Sized):

@dataclasses.dataclass(eq=True)
class SQLStageQueryHandlerInput:
"""
A Parameter type for the SQLStageQueryHandler.
Contains a list of Input/Output objects and a location in the BucketFS for
persisting the data if needed.

The `sql_stage_inputs` is a list because the input data for a node can come
from multiple predecessors.
"""
sql_stage_inputs: List[SQLStageInputOutput]
result_bucketfs_location: AbstractBucketFSLocation

def __post_init__(self):
if is_empty(self.sql_stage_inputs):
raise AssertionError("Empty sql_stage_inputs not allowed.")
raise AssertionError("Empty sql_stage_inputs are not allowed.")


class SQLStageQueryHandler(
QueryHandler[SQLStageQueryHandlerInput, SQLStageInputOutput], ABC
):
"""
The base class for a QueryHandler of a single node. The implementation shall be
provided by the user. See the UDFQueryHandler and the user_guide.md for details
and guidelines on how to build a query handler.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_empty_stage_inputs():
bucketfs_location: Union[AbstractBucketFSLocation, MagicMock] = create_autospec(
AbstractBucketFSLocation
)
with pytest.raises(AssertionError, match="Empty sql_stage_inputs not allowed."):
with pytest.raises(AssertionError, match="Empty sql_stage_inputs are not allowed."):
SQLStageQueryHandlerInput(
sql_stage_inputs=[], result_bucketfs_location=bucketfs_location
)
Expand Down
Loading