From 464f50b585606c833b5071a1ea719a569ca2a4b1 Mon Sep 17 00:00:00 2001 From: Nicolas LAURENT Date: Fri, 20 Dec 2024 11:02:56 +0100 Subject: [PATCH] SSD parsing: code refactoring --- fmu_manipulation_toolbox/assembly.py | 111 ++++++++++++++++----------- 1 file changed, 66 insertions(+), 45 deletions(-) diff --git a/fmu_manipulation_toolbox/assembly.py b/fmu_manipulation_toolbox/assembly.py index 908685d..25427f9 100644 --- a/fmu_manipulation_toolbox/assembly.py +++ b/fmu_manipulation_toolbox/assembly.py @@ -124,7 +124,7 @@ def __init__(self, filename: str, step_size=None, auto_link=True, auto_input=Tr auto_output=True, mt=False, profiling=False, fmu_directory: Path = "."): self.filename = Path(filename) self.default_auto_input = auto_input - self.debug=debug + self.debug = debug self.default_auto_output = auto_output self.default_step_size = step_size self.default_auto_link = auto_link @@ -170,8 +170,8 @@ def write(self, filename: str): def read_csv(self): name = str(self.filename.with_suffix(".fmu")) self.root = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link, - mt=self.default_mt, profiling=self.default_profiling, auto_input=self.default_auto_input, - auto_output=self.default_auto_output) + mt=self.default_mt, profiling=self.default_profiling, + auto_input=self.default_auto_input, auto_output=self.default_auto_output) with open(self.description_pathname) as file: reader = csv.reader(file, delimiter=';') @@ -359,45 +359,10 @@ def json_decode_node(self, data) -> AssemblyNode: node.add_drop_port(line[0], line[1]) return node - + def read_ssp(self): logger.warning("This feature is ALPHA stage.") - node_stack: List[AssemblyNode] = [] - - def start_element(tag_name, attrs): - if tag_name == 'ssd:Connection': - if 'startElement' in attrs: - if 'endElement' in attrs: - node_stack[-1].add_link(attrs['startElement'] + '.fmu', attrs['startConnector'], - attrs['endElement'] + '.fmu', attrs['endConnector']) - else: - node_stack[-1].add_output(attrs['startElement'] + '.fmu', attrs['startConnector'], - attrs['endConnector']) - else: - node_stack[-1].add_input(attrs['startConnector'], - attrs['endElement'] + '.fmu', attrs['endConnector']) - - elif tag_name == 'ssd:System': - name = attrs['name']+".fmu" - logger.info(f"System: {name}") - node = AssemblyNode(name, step_size=self.default_step_size, auto_link=self.default_auto_link, - mt=self.default_mt, profiling=self.default_profiling, - auto_input=self.default_auto_input, auto_output=self.default_auto_output) - if node_stack: - node_stack[-1].add_sub_node(node) - else: - self.root = node - - node_stack.append(node) - - elif tag_name == 'ssd:Component': - logger.info(f"Component: {attrs}") - def end_element(tag_name): - if tag_name == 'ssd:System': - node_stack.pop() - - # TODO: handle nested SSD with zipfile.ZipFile(self.fmu_directory / self.filename) as zin: for file in zin.filelist: target_filename = Path(file.filename).name @@ -409,12 +374,68 @@ def end_element(tag_name): self.description_pathname = self.fmu_directory / "SystemStructure.ssd" if self.description_pathname.is_file(): - logger.debug(f"Analysing {self.description_pathname}") - with open(self.description_pathname, "rb") as file: - parser = xml.parsers.expat.ParserCreate() - parser.StartElementHandler = start_element - parser.EndElementHandler = end_element - parser.ParseFile(file) + sdd = SSDParser(step_size=self.default_step_size, auto_link=self.default_auto_link, + mt=self.default_mt, profiling=self.default_profiling, + auto_input=self.default_auto_input, auto_output=self.default_auto_output) + self.root = sdd.parse(self.fmu_directory / "SystemStructure.ssd") def make_fmu(self): self.root.make_fmu(self.fmu_directory, debug=self.debug, description_pathname=self.description_pathname) + + +class SSDParser: + def __init__(self, **kwargs): + self.node_stack: List[AssemblyNode] = [] + self.root = None + self.fmu_filenames: Dict[str, str] = {} # Component name => FMU filename + self.node_attrs = kwargs + + def parse(self, ssd_filepath: Path) -> AssemblyNode: + logger.debug(f"Analysing {ssd_filepath}") + with open(ssd_filepath, "rb") as file: + parser = xml.parsers.expat.ParserCreate() + parser.StartElementHandler = self.start_element + parser.EndElementHandler = self.end_element + parser.ParseFile(file) + + return self.root + + def start_element(self, tag_name, attrs): + if tag_name == 'ssd:Connection': + if 'startElement' in attrs: + if 'endElement' in attrs: + fmu_start = self.fmu_filenames[attrs['startElement']] + fmu_end = self.fmu_filenames[attrs['endElement']] + self.node_stack[-1].add_link(fmu_start, attrs['startConnector'], + fmu_end, attrs['endConnector']) + else: + fmu_start = self.fmu_filenames[attrs['startElement']] + self.node_stack[-1].add_output(fmu_start, attrs['startConnector'], + attrs['endConnector']) + else: + fmu_end = self.fmu_filenames[attrs['endElement']] + self.node_stack[-1].add_input(attrs['startConnector'], + fmu_end, attrs['endConnector']) + + elif tag_name == 'ssd:System': + logger.info(f"SSP System: {attrs['name']}") + filename = attrs['name'] + ".fmu" + self.fmu_filenames[attrs['name']] = filename + node = AssemblyNode(filename, **self.node_attrs) + if self.node_stack: + self.node_stack[-1].add_sub_node(node) + else: + self.root = node + + self.node_stack.append(node) + + elif tag_name == 'ssd:Component': + filename = Path(attrs['source']).name + name = attrs['name'] + self.fmu_filenames[name] = filename + self.node_stack[-1].add_fmu(filename) + logger.debug(f"Component {name} => {filename}") + + def end_element(self, tag_name): + if tag_name == 'ssd:System': + self.node_stack.pop()