Skip to content

Commit

Permalink
docsctrings and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
FloSch62 committed Dec 20, 2024
1 parent b9ddd43 commit aa0b24a
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 374 deletions.
3 changes: 1 addition & 2 deletions core/data/graph_level_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ def assign_graphlevels(self, diagram, verbose=False) -> list:
already_set = True
else:
already_set = False
if verbose:
print(
print(
"Not all graph levels set in the .clab file. Assigning graph levels based on downstream links. Expect experimental output. Please consider assigning graph levels to your .clab file, or use it with -I for interactive mode. Find more information here: https://github.com/srl-labs/clab-io-draw/blob/grafana_style/docs/clab2drawio.md#influencing-node-placement"
)

Expand Down
41 changes: 21 additions & 20 deletions core/data/node_link_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
from core.models.node import Node
from core.models.link import Link

logger = logging.getLogger(__name__)

class NodeLinkBuilder:
"""
Builds Node and Link objects from containerlab topology data and styling information.
"""

def __init__(self, containerlab_data: dict, styles: dict, prefix: str, lab_name: str):
"""
:param containerlab_data: Parsed containerlab topology data.
Expand All @@ -17,7 +21,13 @@ def __init__(self, containerlab_data: dict, styles: dict, prefix: str, lab_name:
self.prefix = prefix
self.lab_name = lab_name

def format_node_name(self, base_name):
def format_node_name(self, base_name: str) -> str:
"""
Format node name with given prefix and lab_name.
:param base_name: Original node name from containerlab data.
:return: Formatted node name string.
"""
if self.prefix == "":
return base_name
elif self.prefix == "clab" and not self.prefix:
Expand All @@ -27,21 +37,21 @@ def format_node_name(self, base_name):

def build_nodes_and_links(self):
"""
Build Node and Link objects from the provided containerlab data and return them.
Build Node and Link objects from the provided containerlab data.
:return: A tuple (nodes_dict, links_list) where:
nodes_dict is a mapping of node_name -> Node instance.
links_list is a list of Link instances.
:return: A tuple (nodes_dict, links_list)
"""
logger.debug("Building nodes...")
nodes = self._build_nodes()
logger.debug("Building links...")
links = self._build_links(nodes)
return nodes, links

def _build_nodes(self):
"""
Internal method to build Node objects.
Internal method to build Node instances from containerlab topology data.
:return: Dictionary of node_name -> Node instances.
:return: Dictionary of node_name -> Node
"""
nodes_from_clab = self.containerlab_data["topology"]["nodes"]

Expand Down Expand Up @@ -73,10 +83,10 @@ def _build_nodes(self):

def _build_links(self, nodes):
"""
Internal method to build Link objects and attach them to nodes.
Internal method to build Link instances and attach them to their respective nodes.
:param nodes: Dictionary of node_name -> Node instances.
:return: List of Link instances.
:param nodes: Dictionary of node_name -> Node
:return: List of Link objects
"""
links_from_clab = []
for link in self.containerlab_data["topology"].get("links", []):
Expand All @@ -88,7 +98,6 @@ def _build_links(self, nodes):
source_node = self.format_node_name(source_node)
target_node = self.format_node_name(target_node)

# Add link only if both source and target nodes exist
if source_node in nodes and target_node in nodes:
links_from_clab.append(
{
Expand All @@ -114,10 +123,6 @@ def _build_links(self, nodes):
link_style=self.styles.get("link_style", ""),
src_label_style=self.styles.get("src_label_style", ""),
trgt_label_style=self.styles.get("trgt_label_style", ""),
entryY=link_data.get("entryY", 0),
exitY=link_data.get("exitY", 0),
entryX=link_data.get("entryX", 0),
exitX=link_data.get("exitX", 0),
direction="downstream",
)
upstream_link = Link(
Expand All @@ -129,16 +134,12 @@ def _build_links(self, nodes):
link_style=self.styles.get("link_style", ""),
src_label_style=self.styles.get("src_label_style", ""),
trgt_label_style=self.styles.get("trgt_label_style", ""),
entryY=link_data.get("entryY", 0),
exitY=link_data.get("exitY", 0),
entryX=link_data.get("entryX", 0),
exitX=link_data.get("exitX", 0),
direction="upstream",
)
links.append(downstream_link)
links.append(upstream_link)

# Add the links to the source and target nodes
# Attach links to nodes
source_node.add_link(downstream_link)
target_node.add_link(upstream_link)

Expand Down
5 changes: 3 additions & 2 deletions core/data/topology_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import yaml
import sys
import logging

logger = logging.getLogger(__name__)
Expand All @@ -11,12 +10,14 @@ class TopologyLoader:
"""
Loads containerlab topology data from a YAML file.
"""

def load(self, input_file: str) -> dict:
"""
Load the containerlab YAML topology file and return its contents as a dictionary.
Load the containerlab YAML topology file.
:param input_file: Path to the containerlab YAML file.
:return: Parsed containerlab topology data.
:raises TopologyLoaderError: If file not found or parse error occurs.
"""
logger.debug(f"Loading topology from file: {input_file}")
try:
Expand Down
140 changes: 52 additions & 88 deletions core/diagram/custom_drawio.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from N2G import drawio_diagram
import xml.etree.ElementTree as ET
from collections import defaultdict
import logging

logger = logging.getLogger(__name__)

class CustomDrawioDiagram(drawio_diagram):
# Overriding the drawio_diagram_xml with shadow=0
"""
Custom Drawio Diagram class extending N2G's drawio_diagram.
Provides additional functionality for grouping nodes and handling styles.
"""

# Overriding the base diagram XML to control page and style attributes.
drawio_diagram_xml = """
<diagram id="{id}" name="{name}">
<mxGraphModel dx="{width}" dy="{height}" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="827" pageHeight="1169" math="0" shadow="0" background="#000000">
Expand Down Expand Up @@ -36,16 +42,27 @@ def __init__(self, styles=None, node_duplicates="skip", link_duplicates="skip"):
"""

super().__init__(
node_duplicates,
link_duplicates,
node_duplicates=node_duplicates,
link_duplicates=link_duplicates,
)

def calculate_new_group_positions(self, obj_pos_old, group_pos):
# Adjust object positions relative to the new group's position
obj_pos_new = (obj_pos_old[0] - group_pos[0], obj_pos_old[1] - group_pos[1])
return obj_pos_new
"""
Adjust object positions relative to the new group's position.
:param obj_pos_old: (x,y) of the object's old position.
:param group_pos: (x,y) of the group's position.
:return: (x,y) of the new object position inside the group.
"""
return (obj_pos_old[0] - group_pos[0], obj_pos_old[1] - group_pos[1])

def update_style(self, styles):
"""
Update the diagram style based on the given styles dictionary.
:param styles: Dictionary containing style config.
"""
logger.debug("Updating diagram style with new background, shadow, grid, etc.")
background = styles["background"]
shadow = styles["shadow"]
grid = styles["grid"]
Expand All @@ -65,13 +82,19 @@ def update_style(self, styles):
self.drawing = ET.fromstring(self.drawio_drawing_xml)

def group_nodes(self, member_objects, group_id, style=""):
# Initialize bounding box coordinates
"""
Create a group cell in the diagram containing the specified member objects.
:param member_objects: List of object IDs to group.
:param group_id: Unique ID for the group.
:param style: Style string for the group cell.
"""
logger.debug(f"Grouping nodes {member_objects} into group '{group_id}'")
min_x = min_y = float("inf")
max_x = max_y = float("-inf")
object_positions = []

object_positions = [] # To store all object positions

# Process each member object to update the bounding box
# Calculate bounding box
for obj_id in member_objects:
obj_mxcell = self.current_root.find(f".//object[@id='{obj_id}']/mxCell")
if obj_mxcell is not None:
Expand All @@ -83,16 +106,13 @@ def group_nodes(self, member_objects, group_id, style=""):
float(geometry.get("height", "0")),
)

# Store object positions and update bounding box
object_positions.append((obj_id, x, y, width, height))
min_x, min_y = min(min_x, x), min(min_y, y)
max_x, max_y = max(max_x, x + width), max(max_y, y + height)

# Define the group's position and size based on the bounding box
group_x, group_y = min_x, min_y
group_width, group_height = max_x - min_x, max_y - min_y

# Create the group cell in the XML structure
group_cell_xml = f"""
<mxCell id="{group_id}" value="" style="{style}" vertex="1" connectable="0" parent="1">
<mxGeometry x="{group_x}" y="{group_y}" width="{group_width}" height="{group_height}" as="geometry" />
Expand All @@ -101,22 +121,16 @@ def group_nodes(self, member_objects, group_id, style=""):
group_cell = ET.fromstring(group_cell_xml)
self.current_root.append(group_cell)

# Update positions of all objects within the group
# Update positions of objects within group
for obj_id, x, y, _, _ in object_positions:
obj_pos_old = (x, y)
obj_pos_new = self.calculate_new_group_positions(
obj_pos_old, (group_x, group_y)
)

obj_pos_new = self.calculate_new_group_positions((x, y), (group_x, group_y))
obj_mxcell = self.current_root.find(f".//object[@id='{obj_id}']/mxCell")
if obj_mxcell is not None:
geometry = obj_mxcell.find("./mxGeometry")
if geometry is not None:
geometry.set("x", str(obj_pos_new[0]))
geometry.set("y", str(obj_pos_new[1]))
obj_mxcell.set(
"parent", group_id
) # Set the object's parent to the new group
obj_mxcell.set("parent", group_id)

def get_used_levels(self):
return set([node.graph_level for node in self.nodes.values()])
Expand All @@ -133,35 +147,13 @@ def get_links_from_nodes(self):
links.extend(node.get_all_links())
return links

def get_upstream_links_from_nodes(self):
links = []
for node in self.nodes.values():
links.extend(node.get_upstream_links())
return links

def get_downstream_links_from_nodes(self):
links = []
for node in self.nodes.values():
links.extend(node.get_downstream_links())
return links

def get_lateral_links_from_nodes(self):
links = []
for node in self.nodes.values():
links.extend(node.get_lateral_links())
return links

def get_target_link(self, source_link):
for link in self.get_links_from_nodes():
if (
link.source == source_link.target
and link.target == source_link.source
and (
link.direction != "lateral"
or link.direction == source_link.direction
)
and source_link.source_intf == link.target_intf
and source_link.target_intf == link.source_intf
and link.source_intf == source_link.target_intf
and link.target_intf == source_link.source_intf
):
return link
return None
Expand All @@ -175,18 +167,8 @@ def get_nodes_with_same_xy(self):

for node_id, node in self.nodes.items():
x, y = node.pos_x, node.pos_y

# Add node to nodes_with_same_x if x-coordinate matches
if x in nodes_with_same_x:
nodes_with_same_x[x].append(node)
else:
nodes_with_same_x[x] = [node]

# Add node to nodes_with_same_y if y-coordinate matches
if y in nodes_with_same_y:
nodes_with_same_y[y].append(node)
else:
nodes_with_same_y[y] = [node]
nodes_with_same_x.setdefault(x, []).append(node)
nodes_with_same_y.setdefault(y, []).append(node)

return nodes_with_same_x, nodes_with_same_y

Expand All @@ -195,51 +177,33 @@ def get_nodes_between_interconnected(self):
nodes_between_interconnected_x = []
nodes_between_interconnected_y = []

# Check vertical alignment
for coord, nodes in nodes_with_same_x.items():
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
node1 = nodes[i]
node2 = nodes[j]
if node1.is_connected_to(node2):
# Check if there are any nodes between node1 and node2 based on their positions
for node_between in nodes:
if node_between != node1 and node_between != node2:
if (node1.pos_y < node_between.pos_y < node2.pos_y) or (
node2.pos_y < node_between.pos_y < node1.pos_y
):
if (
node_between
not in nodes_between_interconnected_x
):
nodes_between_interconnected_x.append(
node_between
)
if node_between not in (node1, node2):
if (node1.pos_y < node_between.pos_y < node2.pos_y) or (node2.pos_y < node_between.pos_y < node1.pos_y):
if node_between not in nodes_between_interconnected_x:
nodes_between_interconnected_x.append(node_between)

# Check horizontal alignment
for coord, nodes in nodes_with_same_y.items():
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
node1 = nodes[i]
node2 = nodes[j]
if node1.is_connected_to(node2):
# Check if there are any nodes between node1 and node2 based on their positions
for node_between in nodes:
if node_between != node1 and node_between != node2:
if (node1.pos_x < node_between.pos_x < node2.pos_x) or (
node2.pos_x < node_between.pos_x < node1.pos_x
):
if (
node_between
not in nodes_between_interconnected_y
):
nodes_between_interconnected_y.append(
node_between
)
if node_between not in (node1, node2):
if (node1.pos_x < node_between.pos_x < node2.pos_x) or (node2.pos_x < node_between.pos_x < node1.pos_x):
if node_between not in nodes_between_interconnected_y:
nodes_between_interconnected_y.append(node_between)

return nodes_between_interconnected_x, nodes_between_interconnected_y

def get_nodes_by_level(self, level):
nodes_by_level = {}
for node in self.nodes.values():
if node.graph_level == level:
nodes_by_level[node.name] = node
return nodes_by_level
return {node.name: node for node in self.nodes.values() if node.graph_level == level}
Loading

0 comments on commit aa0b24a

Please sign in to comment.