Skip to content

Commit

Permalink
SSD parsing: code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nl78 committed Dec 20, 2024
1 parent e955811 commit 464f50b
Showing 1 changed file with 66 additions and 45 deletions.
111 changes: 66 additions & 45 deletions fmu_manipulation_toolbox/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __init__(self, filename: str, step_size=None, auto_link=True, auto_input=Tr
auto_output=True, mt=False, profiling=False, fmu_directory: Path = "."):
self.filename = Path(filename)
self.default_auto_input = auto_input
self.debug=debug
self.debug = debug
self.default_auto_output = auto_output
self.default_step_size = step_size
self.default_auto_link = auto_link
Expand Down Expand Up @@ -170,8 +170,8 @@ def write(self, filename: str):
def read_csv(self):
name = str(self.filename.with_suffix(".fmu"))
self.root = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link,
mt=self.default_mt, profiling=self.default_profiling, auto_input=self.default_auto_input,
auto_output=self.default_auto_output)
mt=self.default_mt, profiling=self.default_profiling,
auto_input=self.default_auto_input, auto_output=self.default_auto_output)

with open(self.description_pathname) as file:
reader = csv.reader(file, delimiter=';')
Expand Down Expand Up @@ -359,45 +359,10 @@ def json_decode_node(self, data) -> AssemblyNode:
node.add_drop_port(line[0], line[1])

return node

def read_ssp(self):
logger.warning("This feature is ALPHA stage.")
node_stack: List[AssemblyNode] = []

def start_element(tag_name, attrs):
if tag_name == 'ssd:Connection':
if 'startElement' in attrs:
if 'endElement' in attrs:
node_stack[-1].add_link(attrs['startElement'] + '.fmu', attrs['startConnector'],
attrs['endElement'] + '.fmu', attrs['endConnector'])
else:
node_stack[-1].add_output(attrs['startElement'] + '.fmu', attrs['startConnector'],
attrs['endConnector'])
else:
node_stack[-1].add_input(attrs['startConnector'],
attrs['endElement'] + '.fmu', attrs['endConnector'])

elif tag_name == 'ssd:System':
name = attrs['name']+".fmu"
logger.info(f"System: {name}")
node = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link,
mt=self.default_mt, profiling=self.default_profiling,
auto_input=self.default_auto_input, auto_output=self.default_auto_output)
if node_stack:
node_stack[-1].add_sub_node(node)
else:
self.root = node

node_stack.append(node)

elif tag_name == 'ssd:Component':
logger.info(f"Component: {attrs}")

def end_element(tag_name):
if tag_name == 'ssd:System':
node_stack.pop()

# TODO: handle nested SSD
with zipfile.ZipFile(self.fmu_directory / self.filename) as zin:
for file in zin.filelist:
target_filename = Path(file.filename).name
Expand All @@ -409,12 +374,68 @@ def end_element(tag_name):

self.description_pathname = self.fmu_directory / "SystemStructure.ssd"
if self.description_pathname.is_file():
logger.debug(f"Analysing {self.description_pathname}")
with open(self.description_pathname, "rb") as file:
parser = xml.parsers.expat.ParserCreate()
parser.StartElementHandler = start_element
parser.EndElementHandler = end_element
parser.ParseFile(file)
sdd = SSDParser(step_size=self.default_step_size, auto_link=self.default_auto_link,
mt=self.default_mt, profiling=self.default_profiling,
auto_input=self.default_auto_input, auto_output=self.default_auto_output)
self.root = sdd.parse(self.fmu_directory / "SystemStructure.ssd")

def make_fmu(self):
self.root.make_fmu(self.fmu_directory, debug=self.debug, description_pathname=self.description_pathname)


class SSDParser:
def __init__(self, **kwargs):
self.node_stack: List[AssemblyNode] = []
self.root = None
self.fmu_filenames: Dict[str, str] = {} # Component name => FMU filename
self.node_attrs = kwargs

def parse(self, ssd_filepath: Path) -> AssemblyNode:
logger.debug(f"Analysing {ssd_filepath}")
with open(ssd_filepath, "rb") as file:
parser = xml.parsers.expat.ParserCreate()
parser.StartElementHandler = self.start_element
parser.EndElementHandler = self.end_element
parser.ParseFile(file)

return self.root

def start_element(self, tag_name, attrs):
if tag_name == 'ssd:Connection':
if 'startElement' in attrs:
if 'endElement' in attrs:
fmu_start = self.fmu_filenames[attrs['startElement']]
fmu_end = self.fmu_filenames[attrs['endElement']]
self.node_stack[-1].add_link(fmu_start, attrs['startConnector'],
fmu_end, attrs['endConnector'])
else:
fmu_start = self.fmu_filenames[attrs['startElement']]
self.node_stack[-1].add_output(fmu_start, attrs['startConnector'],
attrs['endConnector'])
else:
fmu_end = self.fmu_filenames[attrs['endElement']]
self.node_stack[-1].add_input(attrs['startConnector'],
fmu_end, attrs['endConnector'])

elif tag_name == 'ssd:System':
logger.info(f"SSP System: {attrs['name']}")
filename = attrs['name'] + ".fmu"
self.fmu_filenames[attrs['name']] = filename
node = AssemblyNode(filename, **self.node_attrs)
if self.node_stack:
self.node_stack[-1].add_sub_node(node)
else:
self.root = node

self.node_stack.append(node)

elif tag_name == 'ssd:Component':
filename = Path(attrs['source']).name
name = attrs['name']
self.fmu_filenames[name] = filename
self.node_stack[-1].add_fmu(filename)
logger.debug(f"Component {name} => {filename}")

def end_element(self, tag_name):
if tag_name == 'ssd:System':
self.node_stack.pop()

0 comments on commit 464f50b

Please sign in to comment.