diff --git a/rosHumble_model_extractor_scripts/Ros_Extractor.py b/rosHumble_model_extractor_scripts/Ros_Extractor.py deleted file mode 100644 index 3519be5..0000000 --- a/rosHumble_model_extractor_scripts/Ros_Extractor.py +++ /dev/null @@ -1,435 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Nadia Hammoudeh Garcia -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import argparse -import subprocess -from ros_model_generator.rosmodel_generator import RosModelGenerator -import ros_metamodels.ros_metamodel_core as RosModelMetamodel - -import rospkg -from copy import deepcopy -#import ament_index_python -from haros.extractor import NodeExtractor, RoscppExtractor, RospyExtractor -from haros.metamodel import Node, Package, RosName, SourceFile -from haros.launch_parser import LaunchParser, LaunchParserError, NodeTag -from haros.cmake_parser import RosCMakeParser -from bonsai.analysis import CodeQuery, resolve_expression - -try: - from bonsai.cpp.clang_parser import CppAstParser -except ImportError: - CppAstParser = None -from bonsai.py.py_parser import PyAstParser - -class RosExtractor(): - def launch(self): - self.parse_arg() - ws = self.args.worspace_path - - #BONSAI PARSER - parser = CppAstParser(workspace = ws) - parser.set_library_path("/usr/lib/llvm-14/lib") - parser.set_standard_includes("/usr/lib/llvm-14/lib/clang/14.0.0/include") - - if (self.args.node): - self.extract_node(self.args.name, self.args.name, self.args.package_name, None, ws, None) - def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): - self.pkg = Package(pkg_name) - if os.environ.get("ROS_VERSION") == "1": - rospack = rospkg.RosPack() - self.pkg.path = rospack.get_path(pkg_name) - self.pkg_type="CatkinPackage" - elif os.environ.get("ROS_VERSION") == "2": - self.pkg.path= self.args.path_to_src - self.pkg_type="AmentPackage" - roscomponent = None - - #HAROS NODE EXTRACTOR - - srcdir = self.pkg.path[len(ws):] - srcdir = os.path.join(ws, srcdir.split(os.sep, 1)[0]) - bindir = os.path.join(ws, "build") - #HAROS CMAKE PARSER - parser = RosCMakeParser(srcdir, bindir, pkgs = [self.pkg]) - model_str = "" - if os.path.isfile(os.path.join(self.pkg.path, "CMakeLists.txt")): - parser.parse(os.path.join(self.pkg.path, "CMakeLists.txt")) - node_name_dict = deepcopy(parser.executables) - node_name_dict.update(deepcopy(parser.libraries)) - for target in node_name_dict.values(): - print("INFO: Found artifact: "+target.output_name) - if (self.args.a): - node_name = target.output_name - node = Node(node_name, self.pkg, rosname=RosName(node_name)) - for file_in in target.files: - full_path = file_in - relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] - file_name = full_path.rsplit('/', 1)[-1] - source_file = SourceFile(file_name, relative_path , self.pkg) - node.source_files.append(source_file) - else: - if target.output_name == node_name: - node = Node(node_name, self.pkg, rosname=RosName(node_name)) - for file_in in target.files: - full_path = file_in - relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] - file_name = full_path.rsplit('/', 1)[-1] - source_file = SourceFile(file_name, relative_path , self.pkg) - node.source_files.append(source_file) - else: - continue - if node.language == "cpp": - parser = CppAstParser(workspace = ws) - analysis = RoscppExtractor(self.pkg, ws) - - db_dir = os.path.join(ws, "build") - if os.path.isfile(os.path.join(db_dir, "compile_commands.json")): - parser.set_database(db_dir) - else: - print("The compile_commands.json file can't be found") - if node.language == "python": - parser = PyAstParser(workspace = ws) - analysis = RospyExtractor(self.pkg, ws) - #node.source_tree = parser.global_scope - for sf in node.source_files: - try: - if parser.parse(sf.path) is not None: - # ROS MODEL EXTRACT PRIMITIVES - if node.language == "python": - node_name=node_name.replace(".py","") - RosModel_node=RosModelMetamodel.Node(node_name) - try: - self.extract_primitives(node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, node_name) - # SAVE ROS MODEL - ros_model = RosModelGenerator() - ros_model.create_model_from_node(self.pkg.name,node_name, RosModel_node, self.args.repo, self.pkg_type) - print("Save model in:") - print(self.args.model_path+"/"+node_name+".ros") - model_str = ros_model.generate_ros_model(self.args.model_path+"/"+node_name+".ros") - except error: - print("The interfaces can't be extracted "+error) - else: - print("The model couldn't be extracted") - except: - pass - if rossystem is not None and roscomponent is not None: - rossystem.add_component(roscomponent) - if self.args.output: - print(model_str) - - def transform_type(self, param_type): - if os.environ.get("ROS_VERSION") == "2": - param_type=str(param_type) - param_type=param_type[param_type.find("[")+1:param_type.find("]")] - if param_type == 'double': - return 'Double' - elif param_type == 'bool': - return 'Boolean' - elif param_type == 'int' or param_type == 'long': - return 'Integer' - elif (param_type == 'str' or 'basic_string' in param_type or param_type == 'std::string'): - return 'String' - #elif param_type == 'yaml': - #return 'Struct' - #elif 'std::vector' in param_type: - #return 'List' - else: - return None - - - def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, art_name): - gs = parser.global_scope - node.source_tree = parser.global_scope - if os.environ.get("ROS_VERSION") == "1": - if node.language == "cpp": - #print(CodeQuery(gs).all_calls.get()) - for call in (CodeQuery(gs).all_calls.where_name("SimpleActionServer").get()): - if len(call.arguments) > 0: - name = analysis._extract_action(call) - action_type = analysis._extract_action_type(call).split("_<",1)[0] - RosModel_node.add_action_server(name,action_type.replace("/",".")) - #roscomponent.add_interface(name,"actsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("SimpleActionClient").get()): - if len(call.arguments) > 0: - name = analysis._extract_action(call) - action_type = analysis._extract_action_type(call).split("_<",1)[0] - RosModel_node.add_action_client.append(name,action_type.replace("/",".")) - #roscomponent.add_interface(name,"actcls", str(pkg_name)+"."+str(art_name)+"."+str(node_name)+"."+str(name)) - for call in (CodeQuery(gs).all_calls.where_name("advertise").where_result("ros::Publisher").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - RosModel_node.add_publisher(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - RosModel_node.add_subscriber(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call) - srv_type = analysis._extract_message_type(call) - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) - #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call) - srv_type = analysis._extract_message_type(call) - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) - #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) - - #PARAMETERS nhg:this needs review - nh_prefix = "c:@N@ros@S@NodeHandle@" - gets = ("getParam", "getParamCached", "param") - reads = gets + ("hasParam", "searchParam") - sets = ("setParam",) - writes = sets + ("deleteParam",) - for call in CodeQuery(gs).all_calls.where_name(reads).get(): - if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): - param_type = default_value = None - param_name = analysis._extract_topic(call) - if call.name in gets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - if call.name == "param": - if len(call.arguments) > 2: - default_value = analysis._extract_param_value( call, arg_pos=2) - elif len(call.arguments) == 2: - default_value = analysis._extract_param_value( call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - - for call in CodeQuery(gs).all_calls.where_name(writes).get(): - if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): - param_type = value = None - param_name = analysis._extract_topic(call) - if len(call.arguments) >= 2 and call.name in sets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - value = analysis._extract_param_value(call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - ros_prefix = "c:@N@ros@N@param@" - gets = ("get", "getCached", "param") - reads = gets + ("has",) - sets = ("set",) - writes = sets + ("del",) - for call in CodeQuery(gs).all_calls.where_name(reads).get(): - if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): - param_type = default_value = None - param_name = analysis._extract_topic(call) - if call.name == "param": - if call.name in gets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - if len(call.arguments) > 2: - default_value = analysis._extract_param_value(call, arg_pos=2) - elif len(call.arguments) == 2: - default_value = analysis._extract_param_value(call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - for call in CodeQuery(gs).all_calls.where_name(writes).get(): - if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): - param_type = value = None - if len(call.arguments) >= 2 and call.name in sets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - value = analysis._extract_param_value(call, arg_pos=1) - param_name = analysis._extract_topic(call) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - - #PYTHON nhg:this needs review - if node.language == "python": - msgs_list = [] - pkgs_list = [] - for imp_name in parser.imported_names_list: - s = str(imp_name) - if "msg" in s or "srv" in s: - ss = s.split(".") - if len(ss) < 2: - continue - if ss[-1] == "msg" or ss[-1] == "srv": - pkgs_list.append(ss[0]) - elif ss[1] == "msg" or ss[1] == "srv": - msgs_list.append((ss[0], ss[2])) - else: - log.debug(("Python import with 'msg' or 'srv', " - "but unable to process it: ") - + s) - for call in CodeQuery(gs).all_calls.get(): - if "rospy.Publisher" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) - #queue_size = analysis._extract_queue_size(call ) - RosModel_node.add_publisher(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.Subscriber" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) - #queue_size = analysis._extract_queue_size(call ) - RosModel_node.add_subscriber(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.Service" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call, '', msgs_list) - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) - #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.ServiceProxy" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call, '', msgs_list) - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) - #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) - #if "rospy.get_param" in str(call): - # print("PARAM GET:") - # print(call) - if "rospy.set_param" in str(call): - param_name = analysis._extract_topic(call, topic_pos=0) - param_type = default_value = None - default_value = resolve_expression(call.arguments[1]) - RosModel_node.add_parameter(param_name.replace(".","/"), default_value , param_type, None) - - if os.environ.get("ROS_VERSION") == "2": - #ROS2 - if node.language == "cpp": - for call in (CodeQuery(gs).all_calls.get()): - - if "Publisher" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or msg_type!="?": - RosModel_node.add_publisher(name, msg_type.replace("/",".").replace(".msg","")) - - if "Subscriber" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or msg_type!="?": - RosModel_node.add_subscriber(name, msg_type.replace("/",".").replace(".msg","")) - - if "Service" in str(call) or "::srv::" in str(call.function): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - print(name + " " + srv_type) - if name!="?" or srv_type!="?": - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace(".srv","")) - - if "Client" in str(call) and "::srv::" in str(call): - #print(call) - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - print(name + " " + srv_type) - if name!="?" or srv_type!="?": - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace(".srv","")) - if "Client" in str(call) and "::action::" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - act_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or act_type!="?": - RosModel_node.add_action_client(name, act_type.replace("/",".").replace(".action","")) - if "Server" in str(call) and "::action::" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - act_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or act_type!="?": - RosModel_node.add_action_server(name, act_type.replace("/",".").replace(".action","")) - #PARAMETERS ROS2 - params=[] - written_params=[] - for call in (CodeQuery(gs).all_calls.get()): - param_prefix = "c:@N@rclcpp@S@Node@F@" - declare_params = ("get_parameter","declare_parameter") - if (call.full_name.startswith("rclcpp::Node") or (isinstance(call.reference, str)) and call.reference.startswith(param_prefix)): - param_type = default_value = None - if(call.name in declare_params) and len(call.arguments) > 1: - param_name = analysis._extract_topic(call, topic_pos=0) - if len(call.arguments) == 2: - param_type= self.transform_type(resolve_expression(call.arguments[1])) - params.append((param_name, param_type)) - elif len(call.arguments) > 2 and not ('[rclcpp::ParameterValue] (default)' in str(resolve_expression(call.arguments[1]))): - default_value = resolve_expression(call.arguments[1]) - param_type = self.transform_type(resolve_expression(call)) - params.append((param_name, param_type, default_value)) - for parameter in params: - param_name_ = param_type_ = default_value_ = None - if len(parameter) > 2: - param_name_, param_type_, default_value_ = parameter - if not ((default_value_ is None or default_value_ == "") and param_type_ is None): - RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) - written_params.append(param_name_) - elif len(parameter) == 2: - param_name_, param_type_ = parameter - if not (param_type_ is None) and not (param_name_ in written_params): - RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) - elif node.language == "python": - msgs_list = [] - pkgs_list = [] - for imp_name in parser.imported_names_list: - s = str(imp_name) - print(imp_name) - if "msg" in s or "srv" in s: - ss = s.split(".") - if len(ss) < 2: - continue - if ss[-1] == "msg" or ss[-1] == "srv": - pkgs_list.append(ss[0]) - elif ss[1] == "msg" or ss[1] == "srv": - msgs_list.append((ss[0], ss[2])) - else: - log.debug(("Python import with 'msg' or 'srv', " - "but unable to process it: ") - + s) - for call in (CodeQuery(gs).all_calls.get()): - print(call) - if "create_subscription" in str(call): - print("Found subcription") - - - - def parse_arg(self): - parser = argparse.ArgumentParser() - mutually_exclusive = parser.add_mutually_exclusive_group() - mutually_exclusive.add_argument('--node', '-n', help="node analyse", action='store_true') - mutually_exclusive.add_argument('--launch', '-l', help="launch analyse", action='store_true') - parser.add_argument('--model-path', help='path to the folder in which the model files should be saved', - default='./', - nargs='?', const='./') - parser.add_argument('--output', help='print the model output') - parser.add_argument('--package', required=True, dest='package_name') - parser.add_argument('--name', required=False, dest='name') - parser.add_argument('--ws', required=True, dest='worspace_path') - parser.add_argument('--path-to-src', required=False, dest='path_to_src') - parser.add_argument('--repo', required=False, dest='repo') - parser.add_argument('-a', action='store_true') - self.args = parser.parse_args() - - diff --git a/rosHumble_model_extractor_scripts/file_handling.py b/rosHumble_model_extractor_scripts/file_handling.py deleted file mode 100644 index 5f3f60d..0000000 --- a/rosHumble_model_extractor_scripts/file_handling.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -import sys -import os -import subprocess -from datetime import datetime -import shutil - -import glob - - -class fileHandling: - def __init__(self): - pass - - @staticmethod - def wrtieToFile(fileNameWithPath, contentToWrite): - with open(fileNameWithPath, "w") as file: - file.write(contentToWrite) - print("-----Write to {} completed-----".format(fileNameWithPath)) - - @staticmethod - def showFileContent(fileNameWithPath): - - print("-----Show the ROS model of {}".format(fileNameWithPath)) - with open(fileNameWithPath,'r') as file: - print(file.read()) - - @staticmethod - def filterFileNamesByExtension(rootDir, extensionToSearch): - - fileList = glob.glob(os.path.join(rootDir, '**', "*.{}".format(extensionToSearch)), recursive=True) - - return fileList \ No newline at end of file diff --git a/rosHumble_model_extractor_scripts/ros2runner.py b/rosHumble_model_extractor_scripts/ros2runner.py deleted file mode 100644 index 34b22e7..0000000 --- a/rosHumble_model_extractor_scripts/ros2runner.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python -import sys -import os -import subprocess -from datetime import datetime -import shutil -from rosHumble_model_extractor_scripts.ros_pkg_runner import rosPkgRunner -from rosHumble_model_extractor_scripts.file_handling import fileHandling -import glob - - - -class ros2Runner(rosPkgRunner): - - ROS2setupBashFile = "install/setup.bash" - ROS2Version = 2 - ROS2PythonVer = 3 - - - def __init__(self, - inputPkgName:str, - inputNodeName:str, - typeOfRequest:str, - pathToROSws:str, - gitRepo:str, - outputDir:str): - - super().__init__(inputPkgName=inputPkgName, - inputNodeName=inputNodeName, - typeOfRequest=typeOfRequest, - pathToROSws=pathToROSws, - gitRepo=gitRepo, - setupBashFile=self.ROS2setupBashFile, - outputDir=outputDir) - - def runColconBuild(self,): - subprocess.run(["colcon", "build", "--cmake-args"]) #CMake Colcon Warning fix - - def runColconList(self,): - os.system('colcon list > /tmp/colcon_list.txt') - - - - def calcPathToPkgSRC(self, textList:str): - - os.system('cat /tmp/colcon_list.txt | grep "^$1" | awk \'{ print $2}\' > /tmp/colcon_list_update.txt') - path_to_src_code=subprocess.run(["cat","/tmp/colcon_list_update.txt"], stdout=subprocess.PIPE, text=True).stdout - path_to_src_code=path_to_src_code.split("\n") - - self.setPathToSRCcode([os.path.join(self.getPathToROSws(), item) for item in path_to_src_code]) - self.validatePathToSRCcodeList() - - def startInstallPkgProcedure(self): - - self.runCommonCmd() - self.runColconBuild() - self.runSetupBash() - colconList = self.runColconList() - self.calcPathToPkgSRC(colconList) - print("## ROS pkg install and Build complete") - - def callHarosPlugin(self,): - - self.harosInit() - - - for pkgName in self.getPathToSRCcode(): - currOutDir = os.path.join(self.outputDir, pkgName.split('/')[-1]) - fileName = os.path.join(os.path.dirname(__file__), self.getROSModelExtractorPluginFile()) - extractorFile = currOutDir+"/extractor.log" - extractorContent = None - - if(not os.path.isdir(currOutDir)): - os.makedirs(currOutDir) - - if self.getInputNodeName() == "--all": - extractorContent = subprocess.run(["python3", fileName, - "--package", str(self.getInputPkgName()), - "--"+str(self.getTypeOfRequest()), - "--model-path",str(currOutDir) , - "--ws", str(self.getPathToROSws()), - "--path-to-src", pkgName, - "--repo", str(self.getGitModelRepo()), - "-a"], stdout=subprocess.PIPE, text=True).stdout - else: - extractorContent = subprocess.run(["python3", fileName, - "--package", str(self.getInputPkgName()), - "--name", str(self.getInputNodeName()), - "--"+str(self.getTypeOfRequest()), - "--model-path",str(currOutDir), - "--ws", str(self.getPathToROSws()), - "--path-to-src", pkgName, - "--repo", str(self.getGitModelRepo()) - ], stdout=subprocess.PIPE, text=True).stdout - fileHandling.wrtieToFile(extractorFile, extractorContent) - - def extractorRun(self,clearFlag=False): - - - #1. clone the git repo at the required - rosPkgRunner.cloneRepo(self.getPathToROSws(), self.getGitRepo()) - - #2. Change working directory to ROS ws - os.chdir(self.getPathToROSws()) - - #3. Install dependencies and build - self.startInstallPkgProcedure() - - #4. Init the plugin - self.callHarosPlugin() - - #5. Show the ros models - self.showAllROSmodel() - - #6. Clear the ROS ws SRC folder - if clearFlag: - self.cleanUPsrc() - - print( "------------------Done---------------") \ No newline at end of file diff --git a/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py b/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py deleted file mode 100644 index 6b40980..0000000 --- a/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Nadia Hammoudeh Garcia -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from Ros_Extractor import RosExtractor - - -class RosInterface: - def __init__(self, name, ref): - self.name = name - self.ref = ref - -class ros_component: - def __init__(self, name, ns): - self.name = ns+name if ns else name - self.ns = ns - self.pubs = [] - self.subs = [] - self.srvsrvs = [] - self.srvcls = [] - self.actsrvs = [] - self.actcls = [] - def add_interface(self, name, interface_type, ref): - if interface_type == "pubs": - self.pubs.append(RosInterface(name,ref)) - if interface_type == "subs": - self.subs.append(RosInterface(name,ref)) - if interface_type == "srvsrvs": - self.srvsrvs.append(RosInterface(name,ref)) - if interface_type == "srvcls": - self.srvcls.append(RosInterface(name,ref)) - if interface_type == "actsrvs": - self.actsrvs.append(RosInterface(name,ref)) - if interface_type == "actcls": - self.actcls.append(RosInterface(name,ref)) - -def main(argv = None): - extractor = RosExtractor() - if extractor.launch(): - return 0 - return 1 - -if __name__== "__main__": - main() diff --git a/rosHumble_model_extractor_scripts/ros_pkg_runner.py b/rosHumble_model_extractor_scripts/ros_pkg_runner.py deleted file mode 100644 index 318ff80..0000000 --- a/rosHumble_model_extractor_scripts/ros_pkg_runner.py +++ /dev/null @@ -1,169 +0,0 @@ -#!/usr/bin/env python -import sys -import os -import subprocess -from datetime import datetime -import shutil -from rosHumble_model_extractor_scripts.file_handling import fileHandling -import glob - -class rosPkgRunner: - - - - def __init__(self, - inputPkgName:str, - inputNodeName:str, - typeOfRequest:str, - outputDir:str, - pathToROSws:str, - gitRepo:str, - setupBashFile:str): - - self.setupBashFile = setupBashFile - self.outputDir = outputDir - - # Private attr - self._inputPkgName = inputPkgName - self._inputNodeName = inputNodeName - self._typeOfRequest = typeOfRequest - self._pathToROSws = pathToROSws - self._gitRepo = gitRepo - self._pathToSRCcode = None - - self._ROS_model_extractor_plugin_file = "rosHumble_model_extractor.py" - - self.mkOuputDir() - - - def getROSModelExtractorPluginFile(self,): - return self._ROS_model_extractor_plugin_file - - def getInputPkgName(self,): - return self._inputPkgName - - def getInputNodeName(self,): - return self._inputNodeName - - def getTypeOfRequest(self,): - return self._typeOfRequest - - def getPathToROSws(self,): - return self._pathToROSws - - def getGitRepo(self,): - return self._gitRepo - - def runSetupBash(self,): - if(os.path.isfile(self.setupBashFile)): - filePath = os.path.join(self.getPathToROSws(), self.setupBashFile) - subprocess.run(['source', filePath], executable="/bin/bash") - - def getPathToSRCcode(self,): - return self._pathToSRCcode - - def setPathToSRCcode(self, newValue): - self._pathToSRCcode = newValue - - def getGitModelRepo(self): - return self.getGitRepo().split(' ')[0] - - - def runROSdepInstall(self,): - subprocess.run(["rosdep", "install", "-y", "-i", "-r", "--from-path", "src"]) - - def runCommonCmd(self,): - self.runSetupBash() - self.runROSdepInstall() - - - def startInstallProcedure(self,): - print("Add additoinal install procedure for respective ROS version") - - def harosInit(self,): - print("## Init HAROS ##") - subprocess.run(["haros","init"]) - - def finishedMsg(self,extractorFile): - print("~~~~~~~~~~~") - print("Extraction finished. See the following report:") - subprocess.run(["cat", extractorFile]) - print("~~~~~~~~~~~\n\n") - - def showROSmodel(self, modelFile): - fileHandling.showFileContent(modelFile) - - - def showAllROSmodel(self,): - - fileList = fileHandling.filterFileNamesByExtension(self.outputDir, 'ros') - - for fileName in fileList: - self.showROSmodel(fileName) - - def cleanUPsrc(self): - - pathToCheck = rosPkgRunner.subDirList(os.path.join(self.getPathToROSws(), 'src')) - for dirs in pathToCheck: - print( "Delete: ", dirs) - shutil.rmtree(dirs) - - def callHarosPlugin(self,): - - print("Update code to execute harosPlugin command!!") - - def mkOuputDir(self,): - - current_timestamp = str(datetime.now().strftime("%Y-%m-%d %H-%M-%S")) - current_timestamp = current_timestamp.split(' ') - output_dir = os.path.join(self.outputDir, current_timestamp[0], current_timestamp[-1]) - - if(not os.path.isdir(output_dir)): - os.makedirs(output_dir) - - self.outputDir = output_dir - print("## Output Directory Created at : {} ##".format(output_dir)) - - def extractorRun(self,): - print(" Write the procedure to run the model extractor file with configuration:") - - @staticmethod - def checkIfValidPkgDir(dirToCheck:str, fileToCheck='package.xml' ): - fileList = os.listdir(dirToCheck) - fileList = [f for f in fileList if os.path.isfile(dirToCheck+'/'+f)] - - numOfFileToCheck = fileList.count(fileToCheck) - if numOfFileToCheck == 1: - return True - else: - if numOfFileToCheck > 0: - print("### More than one {} found in {}, rejected as valid pakage directory ###".format(fileToCheck, dirToCheck)) - return False - - def validatePathToSRCcodeList(self,): - newList = [] - for dirName in self.getPathToSRCcode(): - if rosPkgRunner.checkIfValidPkgDir(dirName): - newList.append(dirName) - - self.setPathToSRCcode(newList) - - @staticmethod - def cloneRepo(ws_dir:str, repo_url:str): - clone_cmd = ["git", "clone" ] - - src_dir = os.path.join(ws_dir, "src/") - print(ws_dir, repo_url, type(ws_dir), src_dir) - os.chdir(src_dir) - repo_url = repo_url.split(' ') - clone_cmd = clone_cmd + repo_url - subprocess.run(clone_cmd) - print("Repository cloned successfully.") - os.chdir(ws_dir) - - - - @staticmethod - def subDirList(parent_dir): - subdirs = [os.path.join(parent_dir, d) for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d))] - return subdirs \ No newline at end of file diff --git a/ros_model_extractor.py b/ros_model_extractor.py index e39782b..7999b5f 100755 --- a/ros_model_extractor.py +++ b/ros_model_extractor.py @@ -23,6 +23,7 @@ import ros_metamodels.ros_metamodel_core as RosModelMetamodel import rospkg +from copy import deepcopy #import ament_index_python from haros.extractor import NodeExtractor, RoscppExtractor, RospyExtractor from haros.metamodel import Node, Package, RosName, SourceFile @@ -73,7 +74,9 @@ def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): model_str = "" if os.path.isfile(os.path.join(self.pkg.path, "CMakeLists.txt")): parser.parse(os.path.join(self.pkg.path, "CMakeLists.txt")) - for target in parser.executables.values(): + node_name_dict = deepcopy(parser.executables) + node_name_dict.update(deepcopy(parser.libraries)) + for target in node_name_dict.values(): print("INFO: Found artifact: "+target.output_name) if (self.args.a): node_name = target.output_name @@ -316,8 +319,7 @@ def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent queue_size = analysis._extract_queue_size(call, queue_pos=1) if name!="?" or msg_type!="?": RosModel_node.add_publisher(name, msg_type.replace("/",".").replace(".msg","")) - for call in (CodeQuery(gs).all_calls.get()): - if "Subscription" in str(call): + if "Subscription" in str(call): # Subscription or Subscriber? #print(call) if len(call.arguments) > 1: name = analysis._extract_topic(call, topic_pos=0) @@ -325,8 +327,7 @@ def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent queue_size = analysis._extract_queue_size(call, queue_pos=1) if name!="?" or msg_type!="?": RosModel_node.add_subscriber(name, msg_type.replace("/",".").replace(".msg","")) - for call in (CodeQuery(gs).all_calls.get()): - if "Service" in str(call) and "::srv::" in str(call): + if "Service" in str(call) and "::srv::" in str(call): #or? #print(call) if len(call.arguments) > 1: name = analysis._extract_topic(call, topic_pos=0) @@ -335,7 +336,6 @@ def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent print(name + " " + srv_type) if name!="?" or srv_type!="?": RosModel_node.add_service_server(name, srv_type.replace("/",".").replace(".srv","")) - for call in (CodeQuery(gs).all_calls.get()): if "Client" in str(call) and "::srv::" in str(call): #print(call) if len(call.arguments) > 1: