From 82aab351891233dc7b49e7d6c95ca1fdbc2c8239 Mon Sep 17 00:00:00 2001 From: trgiangdo Date: Thu, 23 Jan 2025 16:36:28 +0700 Subject: [PATCH] feature: log error when sequence or scenario is not consistent --- taipy/core/scenario/scenario.py | 23 +++++++++++++++++++++++ taipy/core/sequence/sequence.py | 25 +++++++++++++++++++++++++ tests/core/scenario/test_scenario.py | 22 ++++++++++++++++++++++ tests/core/sequence/test_sequence.py | 16 ++++++++++++---- 4 files changed, 82 insertions(+), 4 deletions(-) diff --git a/taipy/core/scenario/scenario.py b/taipy/core/scenario/scenario.py index bdb5652963..1a7c391410 100644 --- a/taipy/core/scenario/scenario.py +++ b/taipy/core/scenario/scenario.py @@ -17,6 +17,7 @@ import networkx as nx from taipy.common.config.common._validate_id import _validate_id +from taipy.common.logger._taipy_logger import _TaipyLogger from .._entity._entity import _Entity from .._entity._labeled import _Labeled @@ -106,6 +107,8 @@ def by_two(x: int): id: ScenarioId """The unique identifier of this scenario.""" + _logger = _TaipyLogger._get_logger() + def __init__( self, config_id: str, @@ -611,12 +614,32 @@ def _is_consistent(self) -> bool: if dag.number_of_nodes() == 0: return True if not nx.is_directed_acyclic_graph(dag): + self._logger.error(f'The DAG of scenario "{self.id}" is not a directed acyclic graph') return False for left_node, right_node in dag.edges: if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or ( isinstance(left_node, Task) and isinstance(right_node, DataNode) ): continue + + left_node_desc = ( + f'{left_node.__class__.__name__} "{left_node.get_label()}"' + if isinstance(left_node, _Labeled) + else left_node.__class__.__name__ + if left_node + else "None" + ) + right_node_desc = ( + f'{right_node.__class__.__name__} "{right_node.get_label()}"' + if isinstance(right_node, _Labeled) + else right_node.__class__.__name__ + if right_node + else "None" + ) + self._logger.error( + f'Invalid edge detected in scenario "{self.id}": left node {left_node_desc} and right node ' + f"{right_node_desc} must connect a Task and a DataNode" + ) return False return True diff --git a/taipy/core/sequence/sequence.py b/taipy/core/sequence/sequence.py index b07d0e348d..2e8d1c33e7 100644 --- a/taipy/core/sequence/sequence.py +++ b/taipy/core/sequence/sequence.py @@ -16,6 +16,7 @@ import networkx as nx from taipy.common.config.common._validate_id import _validate_id +from taipy.common.logger._taipy_logger import _TaipyLogger from .._entity._entity import _Entity from .._entity._labeled import _Labeled @@ -118,6 +119,8 @@ def planning(forecast, capacity): _MANAGER_NAME = "sequence" __CHECK_INIT_DONE_ATTR_NAME = "_init_done" + _logger = _TaipyLogger._get_logger() + id: SequenceId """The unique identifier of the sequence.""" @@ -343,15 +346,37 @@ def _is_consistent(self) -> bool: if dag.number_of_nodes() == 0: return True if not nx.is_directed_acyclic_graph(dag): + self._logger.error(f'The DAG of sequence "{self.id}" is not a directed acyclic graph') return False if not nx.is_weakly_connected(dag): + self._logger.error(f'The DAG of sequence "{self.id}" is not weakly connected') return False for left_node, right_node in dag.edges: if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or ( isinstance(left_node, Task) and isinstance(right_node, DataNode) ): continue + + left_node_desc = ( + f'{left_node.__class__.__name__} "{left_node.get_label()}"' + if isinstance(left_node, _Labeled) + else left_node.__class__.__name__ + if left_node + else "None" + ) + right_node_desc = ( + f'{right_node.__class__.__name__} "{right_node.get_label()}"' + if isinstance(right_node, _Labeled) + else right_node.__class__.__name__ + if right_node + else "None" + ) + self._logger.error( + f'Invalid edge detected in sequence "{self.id}": left node {left_node_desc} and right node ' + f"{right_node_desc} must connect a Task and a DataNode" + ) return False + return True def _get_tasks(self) -> Dict[str, Task]: diff --git a/tests/core/scenario/test_scenario.py b/tests/core/scenario/test_scenario.py index 72899c20ef..515fe369fb 100644 --- a/tests/core/scenario/test_scenario.py +++ b/tests/core/scenario/test_scenario.py @@ -1538,3 +1538,25 @@ def test_check_consistency(): task_5 = Task("bob", {}, print, [data_node_5], [data_node_3], TaskId("t5")) scenario_9 = Scenario("scenario_9", [task_1, task_2, task_3, task_4, task_5], {}, [], scenario_id=ScenarioId("s8")) assert scenario_9._is_consistent() + + +def test_check_inconsistency(caplog): + class FakeDataNode: + config_id = "config_id_of_a_fake_dn" + + data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1") + data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2") + + task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [FakeDataNode()], TaskId("t1")) + task_2 = Task("garply", {}, print, [data_node_1], [data_node_2], id=TaskId("t2")) + scenario_1 = Scenario("scenario_1", [task_1, task_2], {}, [], scenario_id=ScenarioId("s1")) + assert not scenario_1._is_consistent() + assert ( + 'Invalid edge detected in scenario "s1": left node Task "grault" and right node FakeDataNode' + " must connect a Task and a DataNode" in caplog.text + ) + + task_3 = Task("waldo", {}, print, [data_node_2], [data_node_1], id=TaskId("t3")) + scenario_2 = Scenario("scenario_2", [task_2, task_3], {}, [], scenario_id=ScenarioId("s2")) + assert not scenario_2._is_consistent() + assert 'The DAG of scenario "s2" is not a directed acyclic graph' in caplog.text diff --git a/tests/core/sequence/test_sequence.py b/tests/core/sequence/test_sequence.py index 9b78cb53df..f5cb7ca535 100644 --- a/tests/core/sequence/test_sequence.py +++ b/tests/core/sequence/test_sequence.py @@ -30,6 +30,10 @@ from taipy.core.task.task import Task, TaskId +class FakeDataNode: + config_id = "config_id_of_a_fake_dn" + + def test_sequence_equals(): task_config = Config.configure_task("mult_by_3", print, [], None) scenario_config = Config.configure_scenario("scenario", [task_config]) @@ -143,7 +147,7 @@ def test_get_set_attribute(): sequence.bar = "KeyAlreadyUsed" -def test_check_consistency(): +def test_check_consistency(caplog): sequence_1 = Sequence({}, [], "name_1") assert sequence_1._is_consistent() @@ -157,6 +161,7 @@ def test_check_consistency(): task_3 = Task("tfoo", {}, print, [data_node_3], [data_node_3], TaskId("task_id_3")) sequence_3 = Sequence({}, [task_3], "name_3") assert not sequence_3._is_consistent() # Not a dag + assert 'The DAG of sequence "name_3" is not a directed acyclic graph' in caplog.text input_4 = InMemoryDataNode("foo", Scope.SCENARIO) output_4 = InMemoryDataNode("bar", Scope.SCENARIO) @@ -164,9 +169,7 @@ def test_check_consistency(): task_4_2 = Task("tbar", {}, print, [output_4], [input_4], TaskId("task_id_4_2")) sequence_4 = Sequence({}, [task_4_1, task_4_2], "name_4") assert not sequence_4._is_consistent() # Not a Dag - - class FakeDataNode: - config_id = "config_id_of_a_fake_dn" + assert 'The DAG of sequence "name_4" is not a directed acyclic graph' in caplog.text input_6 = DataNode("foo", Scope.SCENARIO, "input_id_5") output_6 = DataNode("bar", Scope.SCENARIO, "output_id_5") @@ -174,6 +177,10 @@ class FakeDataNode: task_6_2 = Task("tbar", {}, print, [output_6], [FakeDataNode()], TaskId("task_id_5_2")) sequence_6 = Sequence({}, [task_6_1, task_6_2], "name_5") assert not sequence_6._is_consistent() # Not a DataNode + assert ( + 'Invalid edge detected in sequence "name_5": left node Task "tbar" and right node FakeDataNode ' + "must connect a Task and a DataNode" in caplog.text + ) intermediate_7 = DataNode("foo", Scope.SCENARIO, "intermediate_id_7") output_7 = DataNode("bar", Scope.SCENARIO, "output_id_7") @@ -197,6 +204,7 @@ class FakeDataNode: task_9_2 = Task("tbar", {}, print, [input_9_2], [output_9_2], TaskId("task_id_9_2")) sequence_9 = Sequence({}, [task_9_1, task_9_2], "name_9") assert not sequence_9._is_consistent() # Not connected + assert 'The DAG of sequence "name_9" is not weakly connected' in caplog.text input_10_1 = DataNode("foo", Scope.SCENARIO, "output_id_10_1") intermediate_10_1 = DataNode("bar", Scope.SCENARIO, "intermediate_id_10_1")