From f8dd089649abff5122eebfa435c82e8c148dd8b7 Mon Sep 17 00:00:00 2001 From: mibe Date: Fri, 20 Dec 2024 10:41:59 +0000 Subject: [PATCH 1/6] Added some comments and docstrings --- .../query_handler/graph/execution_graph.py | 5 ++++ .../query_handler/graph/stage/sql/dataset.py | 3 +++ .../graph/stage/sql/execution/input.py | 6 +++++ .../stage/sql/execution/query_handler.py | 20 ++++++++++++++++ .../sql/execution/query_handler_state.py | 24 ++++++++++++++----- .../graph/stage/sql/input_output.py | 15 ++++++++---- .../graph/stage/sql/sql_stage.py | 6 +++++ .../graph/stage/sql/sql_stage_graph.py | 3 +++ .../stage/sql/sql_stage_query_handler.py | 15 +++++++++++- 9 files changed, 85 insertions(+), 12 deletions(-) diff --git a/exasol/analytics/query_handler/graph/execution_graph.py b/exasol/analytics/query_handler/graph/execution_graph.py index c1b4ede6..197aa527 100644 --- a/exasol/analytics/query_handler/graph/execution_graph.py +++ b/exasol/analytics/query_handler/graph/execution_graph.py @@ -8,6 +8,11 @@ class ExecutionGraph(Generic[T]): + """ + The class is a generic implementation of a directed acyclic graph, with nodes + of type T. + The graph provides a node traverse order. + """ def __init__(self, start_node: T, end_node: T, edges: Set[Tuple[T, T]]): self._graph = nx.DiGraph() diff --git a/exasol/analytics/query_handler/graph/stage/sql/dataset.py b/exasol/analytics/query_handler/graph/stage/sql/dataset.py index 36ab82ca..517872f3 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/dataset.py +++ b/exasol/analytics/query_handler/graph/stage/sql/dataset.py @@ -13,6 +13,9 @@ class Dataset: The TableLike refers to a database table containing the actual data that can be used for instance in training or testing. + + Q. A TableLike is basically a list of columns and a name. Why do we have + a separate list of columns here? """ table_like: TableLike identifier_columns: List[Column] diff --git a/exasol/analytics/query_handler/graph/stage/sql/execution/input.py b/exasol/analytics/query_handler/graph/stage/sql/execution/input.py index e50ac77e..50276cfb 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/execution/input.py +++ b/exasol/analytics/query_handler/graph/stage/sql/execution/input.py @@ -10,6 +10,12 @@ @dataclasses.dataclass(frozen=True, eq=True) class SQLStageGraphExecutionInput: + """ + The class is an input for the graph query handler (not to be confused with + a node query handler provided by a user). + It includes the input data for the root node of the graph, the place in the BucketFS + where the result should be stored and the graph itself. + """ input: SQLStageInputOutput result_bucketfs_location: bfs.path.PathLike sql_stage_graph: SQLStageGraph diff --git a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler.py b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler.py index 414145ed..44f2a409 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler.py +++ b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler.py @@ -24,6 +24,11 @@ class SQLStageGraphExecutionQueryHandler( QueryHandler[SQLStageGraphExecutionInput, SQLStageInputOutput] ): + """ + Implementation of the execution graph QueryHandler. Each node of the graph contains + its own, user provided, query handler. + """ + def __init__( self, parameter: SQLStageGraphExecutionInput, @@ -34,12 +39,20 @@ def __init__( self._state = query_handler_state_factory(parameter, query_handler_context) def start(self) -> Union[Continue, Finish[SQLStageInputOutput]]: + # Start traversing the graph from the root node. result = self._run_until_continue_or_last_stage_finished() return result def handle_query_result( self, query_result: QueryResult ) -> Union[Continue, Finish[SQLStageInputOutput]]: + """ + Here we call the handle_query_result() of the query handler at the current node. + If this handler returns a Finish object then we proceed to the next node, if + there is any, and start traversing the graph from there (see + _run_until_continue_or_last_stage_finished()). Otherwise, if the returned object + is Continue, we stay on the same node and return the result. + """ result = self._state.get_current_query_handler().handle_query_result( query_result ) @@ -57,6 +70,13 @@ def handle_query_result( def _run_until_continue_or_last_stage_finished( self, ) -> Union[Continue, Finish[SQLStageInputOutput]]: + """ + We start traversing the graph from the current node by calling the start() on + its query handler. If the handler returns a Continue object we will exit the + loop and return the result. Otherwise, if the handler returns a Finish object, + we will move to the next node and call the start() on its query handler. We will + carry on until a Continue object is returned or the whole graph is traversed. + """ while True: handler = self._state.get_current_query_handler() result = handler.start() diff --git a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py index 7b63a214..19b4e337 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py +++ b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py @@ -17,9 +17,8 @@ ) from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage from exasol.analytics.query_handler.graph.stage.sql.sql_stage_query_handler import ( - SQLStageQueryHandlerInput, + SQLStageQueryHandlerInput, SQLStageQueryHandler ) -from exasol.analytics.query_handler.query_handler import QueryHandler from exasol.analytics.query_handler.result import Continue, Finish from exasol.analytics.utils.errors import UninitializedAttributeError @@ -53,15 +52,13 @@ def __init__( self._current_stage_index ] self._stage_inputs_map[self._current_stage].append(parameter.input) - self._current_query_handler: Optional[ - QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput] - ] = None + self._current_query_handler: Optional[SQLStageQueryHandler] = None self._current_qh_context: Optional[ScopeQueryHandlerContext] = None self._create_current_query_handler() def get_current_query_handler( self, - ) -> QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]: + ) -> SQLStageQueryHandler: value = self._current_query_handler if value is None: raise RuntimeError("No current query handler set.") @@ -86,6 +83,16 @@ def _checked_current_stage(self) -> SQLStage: def handle_result( self, result: Union[Continue, Finish[SQLStageInputOutput]] ) -> ResultHandlerReturnValue: + """ + Here we look at the provided last result obtained at the current node. + + If it is encapsulated in a Finish object we then either save the result so that + it can be used as an input at the next node, or take it as the final output in + case this node is the last one. In the former case we move to the next node and + return CONTINUE_PROCESSING. In the latter case we return RETURN_RESULT. + + If the result is encapsulated in a Continue object we return RETURN_RESULT. + """ # check if current query handler is set self.get_current_query_handler() if isinstance(result, Finish): @@ -98,6 +105,11 @@ def handle_result( def _handle_finished_result( self, result: Finish[SQLStageInputOutput] ) -> ResultHandlerReturnValue: + """ + We are done with the current node. + If there are successors we need to add the final output from this node to the + list of inputs of all the successors. + """ if self._is_not_last_stage(): self._add_result_to_successors(result.result) else: diff --git a/exasol/analytics/query_handler/graph/stage/sql/input_output.py b/exasol/analytics/query_handler/graph/stage/sql/input_output.py index 960283a0..4a86d857 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/input_output.py +++ b/exasol/analytics/query_handler/graph/stage/sql/input_output.py @@ -8,16 +8,21 @@ @dataclass(frozen=True) class SQLStageInputOutput: """ - A SQLStageInputOutput is used as input and output between the SQLStageQueryHandler. - It contains a dataset and dependencies. The dataset is used to represent train and test data. - The dependencies can be used to communicate any data to the subsequently stages. - For example, a dependency could be a table which the previous stage computed and - the subsequent one uses. + This is a type root for a class representing an input/output data for a customer + provided node-level query handler - SQLStageQueryHandler. The actual content of + the input/output is application specific. """ pass @dataclass(frozen=True) class MultiDatasetSQLStageInputOutput(SQLStageInputOutput): + """ + An implementation of SQLStageInputOutput holding a collection of datasets and + dependencies. Can be used, for example, for model training. The datasets may be + used to represent train and test data. The dependencies can be used to communicate + any data to the subsequently stages. For example, a dependency could be a table + which the previous stage computed and the subsequent one uses. + """ datasets: Dict[object, Dataset] dependencies: Dependencies = field(default_factory=dict) diff --git a/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py b/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py index 8715e7f0..dd65deb2 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py +++ b/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py @@ -9,6 +9,12 @@ class SQLStage(Stage): + """ + This is a node of an ExecutionGraph. + Essentially, this is a node-level query handler factory. The query handler + itself is user-provided and so is this factory. + """ + @abc.abstractmethod def create_query_handler( self, diff --git a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_graph.py b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_graph.py index 7514f3b2..3db2ea7d 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_graph.py +++ b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_graph.py @@ -2,3 +2,6 @@ from exasol.analytics.query_handler.graph.stage.sql.sql_stage import SQLStage SQLStageGraph = ExecutionGraph[SQLStage] +""" +A directed acyclic graph with nodes of type SQLStage. +""" diff --git a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py index ffbdb460..038ee49a 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py +++ b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py @@ -18,15 +18,28 @@ def is_empty(obj: Sized): @dataclasses.dataclass(eq=True) class SQLStageQueryHandlerInput: + """ + A Parameter type for the SQLStageQueryHandler. + Contains a list of Input/Output objects and a location in the BucketFS for + persisting the data if needed. + + Why is the list? Because the input data for a node can come from multiple + predecessors. + """ sql_stage_inputs: List[SQLStageInputOutput] result_bucketfs_location: AbstractBucketFSLocation def __post_init__(self): if is_empty(self.sql_stage_inputs): - raise AssertionError("Empty sql_stage_inputs not allowed.") + raise AssertionError("Empty sql_stage_inputs are not allowed.") class SQLStageQueryHandler( QueryHandler[SQLStageQueryHandlerInput, SQLStageInputOutput], ABC ): + """ + The base class for a QueryHandler of a single node. The implementation shall be + provided by the user. See the UDFQueryHandler and the user_guide.md for details + and guidelines on how to build a query handler. + """ pass From 179900fe0f9ae5b1c0ac71f30a53bcf1490868d7 Mon Sep 17 00:00:00 2001 From: mibe Date: Fri, 20 Dec 2024 11:24:42 +0000 Subject: [PATCH 2/6] Fixed a unit test --- .../sql_stage_graph/test_sql_stage_train_query_handler_input.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/sql_stage_graph/test_sql_stage_train_query_handler_input.py b/tests/unit_tests/sql_stage_graph/test_sql_stage_train_query_handler_input.py index d7a47aed..b437ef0d 100644 --- a/tests/unit_tests/sql_stage_graph/test_sql_stage_train_query_handler_input.py +++ b/tests/unit_tests/sql_stage_graph/test_sql_stage_train_query_handler_input.py @@ -18,7 +18,7 @@ def test_empty_stage_inputs(): bucketfs_location: Union[AbstractBucketFSLocation, MagicMock] = create_autospec( AbstractBucketFSLocation ) - with pytest.raises(AssertionError, match="Empty sql_stage_inputs not allowed."): + with pytest.raises(AssertionError, match="Empty sql_stage_inputs are not allowed."): SQLStageQueryHandlerInput( sql_stage_inputs=[], result_bucketfs_location=bucketfs_location ) From 3b652004176361128edf4efd0737ca5d9eb423c7 Mon Sep 17 00:00:00 2001 From: Mikhail Beck Date: Mon, 23 Dec 2024 08:51:08 +0000 Subject: [PATCH 3/6] Apply suggestions from code review Co-authored-by: Christoph Kuhnke --- .../analytics/query_handler/graph/stage/sql/input_output.py | 6 +++--- exasol/analytics/query_handler/graph/stage/sql/sql_stage.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/exasol/analytics/query_handler/graph/stage/sql/input_output.py b/exasol/analytics/query_handler/graph/stage/sql/input_output.py index 4a86d857..212ccb1c 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/input_output.py +++ b/exasol/analytics/query_handler/graph/stage/sql/input_output.py @@ -8,9 +8,9 @@ @dataclass(frozen=True) class SQLStageInputOutput: """ - This is a type root for a class representing an input/output data for a customer - provided node-level query handler - SQLStageQueryHandler. The actual content of - the input/output is application specific. + This is a type root for a class representing input/output data for a customer + provided node-level query handler extending class SQLStageQueryHandler. The actual content of + the input/output is application-specific. """ pass diff --git a/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py b/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py index dd65deb2..50520df8 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py +++ b/exasol/analytics/query_handler/graph/stage/sql/sql_stage.py @@ -12,7 +12,7 @@ class SQLStage(Stage): """ This is a node of an ExecutionGraph. Essentially, this is a node-level query handler factory. The query handler - itself is user-provided and so is this factory. + itself is user-provided and so are the factories implementing this interface. """ @abc.abstractmethod From ada74b9931c31f17140ad597b7d8b0246e53c699 Mon Sep 17 00:00:00 2001 From: Mikhail Beck Date: Wed, 8 Jan 2025 14:47:47 +0000 Subject: [PATCH 4/6] Update exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py Co-authored-by: Christoph Kuhnke --- .../graph/stage/sql/execution/query_handler_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py index 19b4e337..96f6b6e5 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py +++ b/exasol/analytics/query_handler/graph/stage/sql/execution/query_handler_state.py @@ -107,7 +107,7 @@ def _handle_finished_result( ) -> ResultHandlerReturnValue: """ We are done with the current node. - If there are successors we need to add the final output from this node to the + If there are successors, then we need to add the final output from this node to the list of inputs of all the successors. """ if self._is_not_last_stage(): From d74e3a11dd5ce43a18ca2987b5b890924e9013e6 Mon Sep 17 00:00:00 2001 From: mibe Date: Thu, 9 Jan 2025 14:53:03 +0000 Subject: [PATCH 5/6] Updated a docstring --- .../query_handler/graph/stage/sql/sql_stage_query_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py index 038ee49a..8aa12e85 100644 --- a/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py +++ b/exasol/analytics/query_handler/graph/stage/sql/sql_stage_query_handler.py @@ -23,8 +23,8 @@ class SQLStageQueryHandlerInput: Contains a list of Input/Output objects and a location in the BucketFS for persisting the data if needed. - Why is the list? Because the input data for a node can come from multiple - predecessors. + The `sql_stage_inputs` is a list because the input data for a node can come + from multiple predecessors. """ sql_stage_inputs: List[SQLStageInputOutput] result_bucketfs_location: AbstractBucketFSLocation From 4bce8af3c41dc67871f1ee4b1f34daac6e36ceec Mon Sep 17 00:00:00 2001 From: mibe Date: Thu, 9 Jan 2025 15:36:46 +0000 Subject: [PATCH 6/6] kick-off the tests