Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/#2322 - Log error when sequence or scenario is not consistent #2423

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions taipy/core/scenario/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import networkx as nx

from taipy.common.config.common._validate_id import _validate_id
from taipy.common.logger._taipy_logger import _TaipyLogger

from .._entity._entity import _Entity
from .._entity._labeled import _Labeled
Expand Down Expand Up @@ -106,6 +107,8 @@ def by_two(x: int):
id: ScenarioId
"""The unique identifier of this scenario."""

_logger = _TaipyLogger._get_logger()

def __init__(
self,
config_id: str,
Expand Down Expand Up @@ -611,12 +614,32 @@ def _is_consistent(self) -> bool:
if dag.number_of_nodes() == 0:
return True
if not nx.is_directed_acyclic_graph(dag):
self._logger.error(f'The DAG of scenario "{self.id}" is not a directed acyclic graph')
return False
for left_node, right_node in dag.edges:
if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or (
isinstance(left_node, Task) and isinstance(right_node, DataNode)
):
continue

left_node_desc = (
f'{left_node.__class__.__name__} "{left_node.get_label()}"'
if isinstance(left_node, _Labeled)
else left_node.__class__.__name__
if left_node
else "None"
)
right_node_desc = (
f'{right_node.__class__.__name__} "{right_node.get_label()}"'
if isinstance(right_node, _Labeled)
else right_node.__class__.__name__
if right_node
else "None"
)
self._logger.error(
f'Invalid edge detected in scenario "{self.id}": left node {left_node_desc} and right node '
f"{right_node_desc} must connect a Task and a DataNode"
)
return False
return True

Expand Down
25 changes: 25 additions & 0 deletions taipy/core/sequence/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import networkx as nx

from taipy.common.config.common._validate_id import _validate_id
from taipy.common.logger._taipy_logger import _TaipyLogger

from .._entity._entity import _Entity
from .._entity._labeled import _Labeled
Expand Down Expand Up @@ -118,6 +119,8 @@ def planning(forecast, capacity):
_MANAGER_NAME = "sequence"
__CHECK_INIT_DONE_ATTR_NAME = "_init_done"

_logger = _TaipyLogger._get_logger()

id: SequenceId
"""The unique identifier of the sequence."""

Expand Down Expand Up @@ -343,15 +346,37 @@ def _is_consistent(self) -> bool:
if dag.number_of_nodes() == 0:
return True
if not nx.is_directed_acyclic_graph(dag):
self._logger.error(f'The DAG of sequence "{self.id}" is not a directed acyclic graph')
return False
if not nx.is_weakly_connected(dag):
self._logger.error(f'The DAG of sequence "{self.id}" is not weakly connected')
return False
for left_node, right_node in dag.edges:
if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or (
isinstance(left_node, Task) and isinstance(right_node, DataNode)
):
continue

left_node_desc = (
f'{left_node.__class__.__name__} "{left_node.get_label()}"'
if isinstance(left_node, _Labeled)
else left_node.__class__.__name__
if left_node
else "None"
)
right_node_desc = (
f'{right_node.__class__.__name__} "{right_node.get_label()}"'
if isinstance(right_node, _Labeled)
else right_node.__class__.__name__
if right_node
else "None"
)
self._logger.error(
f'Invalid edge detected in sequence "{self.id}": left node {left_node_desc} and right node '
f"{right_node_desc} must connect a Task and a DataNode"
)
return False

return True

def _get_tasks(self) -> Dict[str, Task]:
Expand Down
22 changes: 22 additions & 0 deletions tests/core/scenario/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,3 +1538,25 @@ def test_check_consistency():
task_5 = Task("bob", {}, print, [data_node_5], [data_node_3], TaskId("t5"))
scenario_9 = Scenario("scenario_9", [task_1, task_2, task_3, task_4, task_5], {}, [], scenario_id=ScenarioId("s8"))
assert scenario_9._is_consistent()


def test_check_inconsistency(caplog):
class FakeDataNode:
config_id = "config_id_of_a_fake_dn"

data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")

task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [FakeDataNode()], TaskId("t1"))
task_2 = Task("garply", {}, print, [data_node_1], [data_node_2], id=TaskId("t2"))
scenario_1 = Scenario("scenario_1", [task_1, task_2], {}, [], scenario_id=ScenarioId("s1"))
assert not scenario_1._is_consistent()
assert (
'Invalid edge detected in scenario "s1": left node Task "grault" and right node FakeDataNode'
" must connect a Task and a DataNode" in caplog.text
)

task_3 = Task("waldo", {}, print, [data_node_2], [data_node_1], id=TaskId("t3"))
scenario_2 = Scenario("scenario_2", [task_2, task_3], {}, [], scenario_id=ScenarioId("s2"))
assert not scenario_2._is_consistent()
assert 'The DAG of scenario "s2" is not a directed acyclic graph' in caplog.text
16 changes: 12 additions & 4 deletions tests/core/sequence/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
from taipy.core.task.task import Task, TaskId


class FakeDataNode:
config_id = "config_id_of_a_fake_dn"


def test_sequence_equals():
task_config = Config.configure_task("mult_by_3", print, [], None)
scenario_config = Config.configure_scenario("scenario", [task_config])
Expand Down Expand Up @@ -143,7 +147,7 @@ def test_get_set_attribute():
sequence.bar = "KeyAlreadyUsed"


def test_check_consistency():
def test_check_consistency(caplog):
sequence_1 = Sequence({}, [], "name_1")
assert sequence_1._is_consistent()

Expand All @@ -157,23 +161,26 @@ def test_check_consistency():
task_3 = Task("tfoo", {}, print, [data_node_3], [data_node_3], TaskId("task_id_3"))
sequence_3 = Sequence({}, [task_3], "name_3")
assert not sequence_3._is_consistent() # Not a dag
assert 'The DAG of sequence "name_3" is not a directed acyclic graph' in caplog.text

input_4 = InMemoryDataNode("foo", Scope.SCENARIO)
output_4 = InMemoryDataNode("bar", Scope.SCENARIO)
task_4_1 = Task("tfoo", {}, print, [input_4], [output_4], TaskId("task_id_4_1"))
task_4_2 = Task("tbar", {}, print, [output_4], [input_4], TaskId("task_id_4_2"))
sequence_4 = Sequence({}, [task_4_1, task_4_2], "name_4")
assert not sequence_4._is_consistent() # Not a Dag

class FakeDataNode:
config_id = "config_id_of_a_fake_dn"
assert 'The DAG of sequence "name_4" is not a directed acyclic graph' in caplog.text

input_6 = DataNode("foo", Scope.SCENARIO, "input_id_5")
output_6 = DataNode("bar", Scope.SCENARIO, "output_id_5")
task_6_1 = Task("tfoo", {}, print, [input_6], [output_6], TaskId("task_id_5_1"))
task_6_2 = Task("tbar", {}, print, [output_6], [FakeDataNode()], TaskId("task_id_5_2"))
sequence_6 = Sequence({}, [task_6_1, task_6_2], "name_5")
assert not sequence_6._is_consistent() # Not a DataNode
assert (
'Invalid edge detected in sequence "name_5": left node Task "tbar" and right node FakeDataNode '
"must connect a Task and a DataNode" in caplog.text
)

intermediate_7 = DataNode("foo", Scope.SCENARIO, "intermediate_id_7")
output_7 = DataNode("bar", Scope.SCENARIO, "output_id_7")
Expand All @@ -197,6 +204,7 @@ class FakeDataNode:
task_9_2 = Task("tbar", {}, print, [input_9_2], [output_9_2], TaskId("task_id_9_2"))
sequence_9 = Sequence({}, [task_9_1, task_9_2], "name_9")
assert not sequence_9._is_consistent() # Not connected
assert 'The DAG of sequence "name_9" is not weakly connected' in caplog.text

input_10_1 = DataNode("foo", Scope.SCENARIO, "output_id_10_1")
intermediate_10_1 = DataNode("bar", Scope.SCENARIO, "intermediate_id_10_1")
Expand Down
Loading