diff --git a/pyproject.toml b/pyproject.toml index f574c88d453..03c5690080f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,10 @@ name = "ansys.pyensight.core" [tool.coverage.run] branch = true +omit = [ + "*/locallauncher.py", + "*/adr.py" +] [tool.coverage.report] exclude_lines = [ diff --git a/src/ansys/pyensight/core/dockerlauncher.py b/src/ansys/pyensight/core/dockerlauncher.py index bfb3a74bfa7..2b498a7e7e1 100644 --- a/src/ansys/pyensight/core/dockerlauncher.py +++ b/src/ansys/pyensight/core/dockerlauncher.py @@ -555,7 +555,7 @@ def connect(self): if self._pim_instance is None: ws_port = self._service_host_port["ws"][1] else: - ws_port = self._service_host_port["http"][1] + ws_port = self._service_host_port["http"][1] # pragma: no cover session = ansys.pyensight.core.session.Session( host=self._service_host_port["grpc_private"][0], grpc_port=self._service_host_port["grpc_private"][1], diff --git a/src/ansys/pyensight/core/enshell_grpc.py b/src/ansys/pyensight/core/enshell_grpc.py index bf4a3a430bf..66367fda8dd 100644 --- a/src/ansys/pyensight/core/enshell_grpc.py +++ b/src/ansys/pyensight/core/enshell_grpc.py @@ -213,12 +213,12 @@ def connect(self, timeout: Optional[float] = 15.0): ) try: grpc.channel_ready_future(self._channel).result(timeout=timeout) - except grpc.FutureTimeoutError: - self._channel = None - return + except grpc.FutureTimeoutError: # pragma: no cover + self._channel = None # pragma: no cover + return # pragma: no cover self._stub = enshell_pb2_grpc.EnShellServiceStub(self._channel) - def connect_existing_channel(self, channel: grpc.Channel): + def connect_existing_channel(self, channel: grpc.Channel): # pragma: no cover """Establish a connection to an EnShell gRPC server. Attempt to connect to an EnShell gRPC server using the host and port @@ -268,15 +268,15 @@ def run_command(self, command_string: str): A tuple of (int, string) for (returnCode, returnString) """ self.connect() - if not self._stub: - return (0, "") + if not self._stub: # pragma: no cover + return (0, "") # pragma: no cover try: response = self._stub.run_command( enshell_pb2.EnShellCommandLine(command_line=command_string), metadata=self.metadata(), ) - except Exception: - raise IOError("gRPC connection dropped") + except Exception: # pragma: no cover + raise IOError("gRPC connection dropped") # pragma: no cover return (response.ret, response.response) @@ -300,8 +300,8 @@ def run_command_with_env(self, command_string: str, env_string: str): A tuple of (int, string) for (returnCode, returnString) """ self.connect() - if not self._stub: - return (0, "") + if not self._stub: # pragma: no cover + return (0, "") # pragma: no cover try: response = self._stub.run_command_with_env( enshell_pb2.EnShellCommandWithEnvLine( @@ -309,8 +309,8 @@ def run_command_with_env(self, command_string: str, env_string: str): ), metadata=self.metadata(), ) - except Exception: - raise IOError("gRPC connection dropped") + except Exception: # pragma: no cover + raise IOError("gRPC connection dropped") # pragma: no cover return (response.ret, response.response) @@ -350,10 +350,10 @@ def start_ensight(self, ensight_args: Optional[str] = None, ensight_env: Optiona if ensight_args and (ensight_args != ""): command_string += " " + ensight_args - if ensight_env is None or ensight_env == "": + if ensight_env is None or ensight_env == "": # pragma: no cover return self.run_command(command_string) else: - return self.run_command_with_env(command_string, ensight_env) + return self.run_command_with_env(command_string, ensight_env) # pragma: no cover # @brief # @@ -380,10 +380,10 @@ def start_other(self, cmd: str, extra_env: Optional[str] = None): self.connect() command_string = "start_app OTHER " + cmd - if extra_env is None or extra_env == "": + if extra_env is None or extra_env == "": # pragma: no cover return self.run_command(command_string) else: - return self.run_command_with_env(command_string, extra_env) + return self.run_command_with_env(command_string, extra_env) # pragma: no cover def cei_home(self): """Get the value of CEI_HOME from EnShell.""" @@ -403,24 +403,30 @@ def _get_cei_home(self): command_string = "show_ceihome" ret = self.run_command(command_string) # logging.debug(f"{command_string} :: ret = {ret}\n") - if ret[0] != 0: - self._cei_home = None - raise RuntimeError("Error getting printenv from EnShell") + if ret[0] != 0: # pragma: no cover + self._cei_home = None # pragma: no cover + raise RuntimeError("Error getting printenv from EnShell") # pragma: no cover # split the newline delimited string into a list of strings env_vars = ret[1].strip().split("\n") # find the string containing CEI_HOME cei_home_line = [x for x in env_vars if "CEI_HOME" in x][0] - if cei_home_line is None: - raise RuntimeError("Error getting CEI_HOME env var from the Docker container.\n{ret}\n") + if cei_home_line is None: # pragma: no cover + raise RuntimeError( + "Error getting CEI_HOME env var from the Docker container.\n{ret}\n" + ) # pragma: no cover # CEI_HOME is everything after the equal sign equal_sign_loc = cei_home_line.find("=") - if equal_sign_loc < 0: - raise RuntimeError("Error getting CEI_HOME env var from the Docker container.\n{ret}\n") + if equal_sign_loc < 0: # pragma: no cover + raise RuntimeError( + "Error getting CEI_HOME env var from the Docker container.\n{ret}\n" + ) # pragma: no cover self._cei_home = cei_home_line[equal_sign_loc + 1 :] m = re.search("/v(\d\d\d)/", self._cei_home) - if not m: - self.stop_server() - raise RuntimeError("Can't find version from cei_home in the Docker container.\n{ret}\n") + if not m: # pragma: no cover + self.stop_server() # pragma: no cover + raise RuntimeError( + "Can't find version from cei_home in the Docker container.\n{ret}\n" + ) # pragma: no cover self._ansys_version = m.group(1) diff --git a/src/ansys/pyensight/core/launch_ensight.py b/src/ansys/pyensight/core/launch_ensight.py index 77d92cf9d63..3812ed3a174 100644 --- a/src/ansys/pyensight/core/launch_ensight.py +++ b/src/ansys/pyensight/core/launch_ensight.py @@ -17,13 +17,13 @@ from ansys.pyensight.core.locallauncher import LocalLauncher from ansys.pyensight.core.session import Session -pim_is_available = False +pim_is_available = False # pragma: no cover try: import ansys.platform.instancemanagement as pypim - pim_is_available = True -except Exception: - pass + pim_is_available = True # pragma: no cover +except Exception: # pragma: no cover + pass # pragma: no cover logging.debug(f"pim_is_available: {pim_is_available}\n") docker_is_available = False @@ -164,10 +164,10 @@ def launch_ensight( return launcher.start() # use local installation of EnSight - launcher = LocalLauncher( - ansys_installation=ansys_installation, - application=application, - batch=batch, - **kwargs, - ) - return launcher.start() + launcher = LocalLauncher( # pragma: no cover + ansys_installation=ansys_installation, # pragma: no cover + application=application, # pragma: no cover + batch=batch, # pragma: no cover + **kwargs, # pragma: no cover + ) # pragma: no cover + return launcher.start() # pragma: no cover diff --git a/src/ansys/pyensight/core/launcher.py b/src/ansys/pyensight/core/launcher.py index 032f02c19fd..5dd9050be58 100644 --- a/src/ansys/pyensight/core/launcher.py +++ b/src/ansys/pyensight/core/launcher.py @@ -201,14 +201,14 @@ def _find_unused_ports(count: int, avoid: Optional[List[int]] = None) -> Optiona # There have been some issues with 65534+ so we stop at 65530 port = base_port % port_mod # port 0 is special - if port == 0: - continue + if port == 0: # pragma: no cover + continue # pragma: no cover # avoid admin ports - if port < 1024: - continue + if port < 1024: # pragma: no cover + continue # pragma: no cover # are we supposed to skip this one? - if port in avoid: - continue + if port in avoid: # pragma: no cover + continue # pragma: no cover # is anyone listening? sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(("127.0.0.1", port)) @@ -219,8 +219,8 @@ def _find_unused_ports(count: int, avoid: Optional[List[int]] = None) -> Optiona if len(ports) >= count: return ports # in case we failed... - if len(ports) < count: - return None + if len(ports) < count: # pragma: no cover + return None # pragma: no cover return ports def _use_egl(self) -> bool: @@ -240,12 +240,12 @@ def _use_egl(self) -> bool: # if the system can't do it, return False now return False - if self._egl_env_val is not None: + if self._egl_env_val is not None: # pragma: no cover # if the environment variable was set, that overrides the constructor option return self._egl_env_val # otherwise, use the arg passed to the constructor - return self._use_egl_param_val + return self._use_egl_param_val # pragma: no cover def _is_system_egl_capable(self) -> bool: # pragma: no cover """Return True if the system supports the EGL launch. @@ -292,8 +292,8 @@ def _add_query_parameters(self, params: Dict[str, str]) -> None: params: dict : query parameters to add to overall dict """ - for item, value in params.items(): - self._query_parameters[item] = value + for item, value in params.items(): # pragma: no cover + self._query_parameters[item] = value # pragma: no cover def _delete_query_parameters(self, params: List[str]) -> None: """Delete query parameters supplied by params from the @@ -304,8 +304,8 @@ def _delete_query_parameters(self, params: List[str]) -> None: params: list : query parameters to delete from the overall dict """ - for item in params: - try: - del self._query_parameters[item] - except Exception: - pass + for item in params: # pragma: no cover + try: # pragma: no cover + del self._query_parameters[item] # pragma: no cover + except Exception: # pragma: no cover + pass # pragma: no cover diff --git a/src/ansys/pyensight/core/utils/export.py b/src/ansys/pyensight/core/utils/export.py index 100ad8b3cab..bac31bc5b41 100644 --- a/src/ansys/pyensight/core/utils/export.py +++ b/src/ansys/pyensight/core/utils/export.py @@ -49,8 +49,8 @@ def _remote_support_check(self): RuntimeError if the module is not present. """ # if a module, then we are inside EnSight - if isinstance(self._ensight, ModuleType): - return + if isinstance(self._ensight, ModuleType): # pragma: no cover + return # pragma: no cover try: _ = self._ensight._session.cmd("dir(ensight.utils.export)") except RuntimeError: @@ -108,8 +108,10 @@ def image( if height is None: height = win_size[1] - if isinstance(self._ensight, ModuleType): - raw_image = self._image_remote(width, height, passes, enhanced, raytrace) + if isinstance(self._ensight, ModuleType): # pragma: no cover + raw_image = self._image_remote( + width, height, passes, enhanced, raytrace + ) # pragma: no cover else: cmd = f"ensight.utils.export._image_remote({width}, {height}, {passes}, " cmd += f"{enhanced}, {raytrace})" @@ -348,8 +350,8 @@ def animation( or no FLIPBOOK/KEYFRAME defined." ) - if isinstance(self._ensight, ModuleType): - raw_mpeg4 = self._animation_remote( + if isinstance(self._ensight, ModuleType): # pragma: no cover + raw_mpeg4 = self._animation_remote( # pragma: no cover width, height, passes, @@ -557,8 +559,8 @@ def geometry( delta_timestep = 1 self._remote_support_check() raw_data_list = None - if isinstance(self._ensight, ModuleType): - raw_data_list = self._geometry_remote( + if isinstance(self._ensight, ModuleType): # pragma: no cover + raw_data_list = self._geometry_remote( # pragma: no cover format, starting_timestep=starting_timestep, frames=frames, diff --git a/src/ansys/pyensight/core/utils/parts.py b/src/ansys/pyensight/core/utils/parts.py index 8c73dbd77a7..f943f35a2e2 100644 --- a/src/ansys/pyensight/core/utils/parts.py +++ b/src/ansys/pyensight/core/utils/parts.py @@ -58,12 +58,12 @@ def convert_variable( class Parts: """Controls the parts in the current EnSight ``Session`` instance.""" - class _EnSEmitterPoint(ens_emitterobj): - def __init__( + class _EnSEmitterPoint(ens_emitterobj): # pragma: no cover + def __init__( # pragma: no cover self, ensight: "ensight", point1: Optional[List[float]] = [0, 0, 0], - ): + ): # pragma: no cover if not isinstance(ensight, ModuleType): raise RuntimeError( "The class cannot be used directly in PyEnSight. It should not be used directly even in EnSight" @@ -73,8 +73,8 @@ def __init__( self.ensight.view_transf.cursor(*point1) self.CENTROID = point1 - class _EnSEmitterGrid(ens_emitterobj): - def __init__( + class _EnSEmitterGrid(ens_emitterobj): # pragma: no cover + def __init__( # pragma: no cover self, ensight: "ensight", point1: Optional[List[float]] = [0, 0, 0], @@ -83,7 +83,7 @@ def __init__( point4: Optional[List[float]] = [0, 0, 0], num_points_x: Optional[int] = 25, num_points_y: Optional[int] = 25, - ): + ): # pragma: no cover if not isinstance(ensight, ModuleType): raise RuntimeError( "The class cannot be used directly in PyEnSight. It should not be used directly even in EnSight" @@ -100,14 +100,14 @@ def __init__( self.NUM_POINTS_X = num_points_x self.NUM_POINTS_Y = num_points_y - class _EnSEmitterLine(ens_emitterobj): - def __init__( + class _EnSEmitterLine(ens_emitterobj): # pragma: no cover + def __init__( # pragma: no cover self, ensight: "ensight", point1: Optional[List[float]] = [0, 0, 0], point2: Optional[List[float]] = [0, 0, 0], num_points: Optional[int] = 100, - ): + ): # pragma: no cover if not isinstance(ensight, ModuleType): raise RuntimeError( "The class cannot be used directly in PyEnSight. It should not be used directly even in EnSight" @@ -120,14 +120,14 @@ def __init__( self.POINT2 = point2 self.NUM_POINTS = num_points - class _EnSEmitterPart(ens_emitterobj): - def __init__( + class _EnSEmitterPart(ens_emitterobj): # pragma: no cover + def __init__( # pragma: no cover self, ensight: "ensight", part: Optional[Any] = None, part_kind: Optional[Any] = 0, num_points: Optional[int] = 100, - ): + ): # pragma: no cover if not isinstance(ensight, ModuleType): raise RuntimeError( "The class cannot be used directly in PyEnSight. It should not be used directly even in EnSight" @@ -257,8 +257,10 @@ def _create_emitters( if not points: raise RuntimeError("list of points needed if particle trace emitted from points") for p in points: - if isinstance(self.ensight, ModuleType): - new_emitters.append(self._EnSEmitterPoint(self.ensight, point1=p)) + if isinstance(self.ensight, ModuleType): # pragma: no cover + new_emitters.append( + self._EnSEmitterPoint(self.ensight, point1=p) + ) # pragma: no cover else: new_emitters.append( f"ensight.utils.parts._EnSEmitterPoint(ensight, point1={p})" @@ -266,8 +268,8 @@ def _create_emitters( elif emitter_type == self._EMIT_LINE: if not any([point1, point2]): raise RuntimeError("point1 and point2 needed if particle trace emitted from line") - if isinstance(self.ensight, ModuleType): - new_emitters.append( + if isinstance(self.ensight, ModuleType): # pragma: no cover + new_emitters.append( # pragma: no cover self._EnSEmitterLine( self.ensight, point1=point1, point2=point2, num_points=num_points ) @@ -281,8 +283,8 @@ def _create_emitters( raise RuntimeError( "point1, point2 and point3 needed if particle trace emitted from plane" ) - if isinstance(self.ensight, ModuleType): - new_emitters.append( + if isinstance(self.ensight, ModuleType): # pragma: no cover + new_emitters.append( # pragma: no cover self._EnSEmitterGrid( self.ensight, point1=point1, @@ -300,8 +302,8 @@ def _create_emitters( if not parts: raise RuntimeError("part and num_points needed if particle trace emitted from part") for p in parts: - if isinstance(self.ensight, ModuleType): - new_emitters.append( + if isinstance(self.ensight, ModuleType): # pragma: no cover + new_emitters.append( # pragma: no cover self._EnSEmitterPart( self.ensight, part=p, @@ -370,13 +372,13 @@ def _add_emitters_to_particle_trace_part( clean: Optional[bool] = False, ) -> "ENS_PART_PARTICLE_TRACE": """Private utility to add emitters to an existing particle trace part.""" - if isinstance(self.ensight, ModuleType): - if clean: - emitters = [] - else: - emitters = particle_trace_part.EMITTERS.copy() - emitters.extend(new_emitters) - particle_trace_part.EMITTERS = emitters + if isinstance(self.ensight, ModuleType): # pragma: no cover + if clean: # pragma: no cover + emitters = [] # pragma: no cover + else: # pragma: no cover + emitters = particle_trace_part.EMITTERS.copy() # pragma: no cover + emitters.extend(new_emitters) # pragma: no cover + particle_trace_part.EMITTERS = emitters # pragma: no cover else: if clean: self.ensight._session.cmd("enscl.emitters=[]", do_eval=False) diff --git a/src/ansys/pyensight/core/utils/variables.py b/src/ansys/pyensight/core/utils/variables.py index 143f08ef6e6..8a3cd87bda6 100644 --- a/src/ansys/pyensight/core/utils/variables.py +++ b/src/ansys/pyensight/core/utils/variables.py @@ -117,8 +117,8 @@ def _move_var_to_elem( calc_string = "(plist," + var_name + ")" per_node_var = var_name + calc_var_name + " = NodeToElem" temp_string = per_node_var + calc_string - if not self._calc_var(pobj_list, temp_string): - raise RuntimeError("Failed to calculate elemental variable") + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Failed to calculate elemental variable") # pragma: no cover else: print( "Using elemental variable that already exists: {}".format(ret_val.DESCRIPTION) @@ -158,7 +158,7 @@ def _calc_var( err = self.ensight.variables.evaluate(calc_string) # ,record=1) if err != 0: err_string = "Error calculating " + calc_string - raise RuntimeError(err_string) + raise RuntimeError(err_string) # pragma: no cover return err == 0 def _shear_force_xyz_rtz( @@ -243,8 +243,8 @@ def _shear_force_xyz_rtz( ensvar_values = [v for v in values] _shear_var_obj = ensvar_values[0] - if not pobj_list: - raise RuntimeError("Error, no part provided") + if not pobj_list: # pragma: no cover + raise RuntimeError("Error, no part provided") # pragma: no cover # # select all parts in list # @@ -263,8 +263,8 @@ def _shear_force_xyz_rtz( # "Normal" function in the variable calculator. # temp_string = "ENS_Force_Norm = Normal(plist)" - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # # makes a new elem var if input var is nodal # @@ -290,8 +290,8 @@ def _shear_force_xyz_rtz( + shear_var_name + ",ENS_Force_Norm)" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # multiplying this DOT product by the surface normal vector produces # the normal component of the shear stress vector. @@ -303,8 +303,8 @@ def _shear_force_xyz_rtz( + stemp_string + "_Norm*ENS_Force_Norm" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # # The tangential component is now computed by subtracting this normal # component from the shear stress vector, or Vt = V - Vn, @@ -318,8 +318,8 @@ def _shear_force_xyz_rtz( + "-ENS_Force_NomalShear" + stemp_string ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # # Decompose the TangentialShearStress Vector into its x, y, z component of # TangentialShearStress_X, TangentialShearStress_Y, and TangentialShearStress_Z @@ -330,8 +330,8 @@ def _shear_force_xyz_rtz( + stemp_string + "[X]" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_TangentialShear" @@ -340,8 +340,8 @@ def _shear_force_xyz_rtz( + stemp_string + "[Y]" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_TangentialShear" @@ -350,8 +350,8 @@ def _shear_force_xyz_rtz( + stemp_string + "[Z]" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # # @@ -362,57 +362,57 @@ def _shear_force_xyz_rtz( # Calculate the element area Scalar using the "EleSize function in the Variable Calculator # temp_string = "ENS_Force_ElementArea = EleSize(plist)" - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_Tan_ShearForce_X = ENS_Force_TangentialShear" + stemp_string + "_X*ENS_Force_ElementArea" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_Tan_ShearForce_Y = ENS_Force_TangentialShear" + stemp_string + "_Y*ENS_Force_ElementArea" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_Tan_ShearForce_Z = ENS_Force_TangentialShear" + stemp_string + "_Z*ENS_Force_ElementArea" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover else: temp_string = ( "ENS_Force_Tan_ShearForce_X = ENS_Force_TangentialShear" + stemp_string + "_X" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_Tan_ShearForce_Y = ENS_Force_TangentialShear" + stemp_string + "_Y" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = ( "ENS_Force_Tan_ShearForce_Z = ENS_Force_TangentialShear" + stemp_string + "_Z" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover if frame_index > 0 and frame_index < len(self.ensight.objs.core.FRAMES): # remake the vector temp_string = "ENS_Force_Tan_ShearForce = MakeVect(plist, ENS_Force_Tan_ShearForce_X, ENS_Force_Tan_ShearForce_Y, ENS_Force_Tan_ShearForce_Z)" - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # resolve it in cylindrical coords temp_string = ( @@ -420,21 +420,21 @@ def _shear_force_xyz_rtz( + str(frame_index) + ")" ) - if not self._calc_var(_pobj_list, temp_string): - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover # Radial, theta , axial temp_string = "ENS_Force_Tan_ShearForce_R = ENS_Force_Tan_ShearForce_cyl[X]" - if not self._calc_var(_pobj_list, temp_string): # radial force - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = "ENS_Force_Tan_ShearForce_T = ENS_Force_Tan_ShearForce_cyl[Y]" - if not self._calc_var(_pobj_list, temp_string): # angular force - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover temp_string = "ENS_Force_Tan_ShearForce_A = ENS_Force_Tan_ShearForce_cyl[Z]" - if not self._calc_var(_pobj_list, temp_string): # axial force - return False + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + return False # pragma: no cover return True def _sum_shear_forces_xyz_rtz( @@ -467,8 +467,8 @@ def _sum_shear_forces_xyz_rtz( # # fcn_name = "sum_shear_forces_xyz_rtz" - if not pobj_list: - raise RuntimeError("Error, no part provided") + if not pobj_list: # pragma: no cover + raise RuntimeError("Error, no part provided") # pragma: no cover # # select all parts in list # @@ -479,19 +479,25 @@ def _sum_shear_forces_xyz_rtz( # # temp_string = "ENS_Force_Net_Tan_ShearForce_X = StatMoment(plist,ENS_Force_Tan_ShearForce_X, 0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Net_Tan_ShearForce_Y = StatMoment(plist,ENS_Force_Tan_ShearForce_Y, 0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Net_Tan_ShearForce_Z = StatMoment(plist,ENS_Force_Tan_ShearForce_Z, 0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover # # get the 3 constant force values XYZ # 10.1.6(b) use ens_utils, 10.2.0(d) Now gets all the per part constants in a list @@ -540,52 +546,70 @@ def _sum_shear_forces_xyz_rtz( # they are calc'd to give the user the totals. # temp_string = "ENS_Force_Total_Net_Tan_ShearForce_X = StatMoment(plist,ENS_Force_Tan_ShearForce_X, 0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Total_Net_Tan_ShearForce_Y = StatMoment(plist,ENS_Force_Tan_ShearForce_Y, 0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Total_Net_Tan_ShearForce_Z = StatMoment(plist,ENS_Force_Tan_ShearForce_Z, 0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover if frame_index > 0 and frame_index < len(self.ensight.objs.core.FRAMES): temp_string = "ENS_Force_Net_Tan_ShearForce_R = StatMoment(plist,ENS_Force_Tan_ShearForce_R,0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Net_Tan_ShearForce_T = StatMoment(plist,ENS_Force_Tan_ShearForce_T,0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Net_Tan_ShearForce_A = StatMoment(plist,ENS_Force_Tan_ShearForce_A,0, Compute_Per_part)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover # # Totals # temp_string = "ENS_Force_Total_Net_Tan_ShearForce_R = StatMoment(plist,ENS_Force_Tan_ShearForce_R,0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Total_Net_Tan_ShearForce_T = StatMoment(plist,ENS_Force_Tan_ShearForce_T,0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover temp_string = "ENS_Force_Total_Net_Tan_ShearForce_A = StatMoment(plist,ENS_Force_Tan_ShearForce_A,0, Compute_Per_case)" - if not self._calc_var(pobj_list, temp_string): - err_string = "Error, failed to calculate a variable in {}".format(fcn_name) - raise RuntimeError(err_string) + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + err_string = "Error, failed to calculate a variable in {}".format( + fcn_name + ) # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover # # get the 3 constant force values Radial, Theta, Axial # new use ens_utils 10.1.6(b) @@ -634,16 +658,18 @@ def _sum_shear_forces_xyz_rtz( for ii in range(len(pobj_list)): ret_val.append([Fx[ii], Fy[ii], Fz[ii], Fr[ii], Ft[ii], Fa[ii]]) return ret_val - else: - raise RuntimeError("Error getting ENS_Force_Net_Tan_ShearForce_R, T and/or A") + else: # pragma: no cover + raise RuntimeError( + "Error getting ENS_Force_Net_Tan_ShearForce_R, T and/or A" + ) # pragma: no cover else: # Only one frame or user picked frame 0 (None) for cylindrical frame calc if all([Fx, Fy, Fz]): ret_val = [] for ii in range(len(pobj_list)): ret_val.append([Fx[ii], Fy[ii], Fz[ii], 0.0, 0.0, 0.0]) return ret_val - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error getting Fx, Fy, and/or Fz Shear Net force per part constant values" ) @@ -738,28 +764,30 @@ def get_const_val( v_type=self.ensight.objs.enums.ENS_VAR_CONSTANT ): const_type = self.ensight.objs.enums.ENS_VAR_CONSTANT - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error, {} Constant name {} is not a constant nor a per part constant".format( ens_routine, const_name ) ) - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error, {} must supply a valid constant variable name ".format(ens_routine) ) elif isinstance(cname, self.ensight.objs.ENS_VAR): const_name = cname.DESCRIPTION const_type = cname.VARTYPEENUM - if ( + if ( # pragma: no cover const_type != self.ensight.objs.enums.ENS_VAR_CONSTANT and const_type != self.ensight.objs.enums.ENS_VAR_CONSTANT_PER_PART ): - raise RuntimeError( + raise RuntimeError( # pragma: no cover "Error, Variable {} is not a constant nor a per part constant".format(cname) ) - else: - raise RuntimeError("Error, 'get_const_val' Constant name is neither string nor ENS_VAR") + else: # pragma: no cover + raise RuntimeError( + "Error, 'get_const_val' Constant name is neither string nor ENS_VAR" + ) # pragma: no cover # # # Now get it @@ -784,8 +812,8 @@ def get_const_val( ret_val.append(None) else: ret_val.append(val) - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error {} part list must contain a list of only ENS_PARTs".format( ens_routine ) @@ -803,8 +831,8 @@ def get_const_val( return None else: return val - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error, {} return value from ensight.ensvariable indicates it is not a float from an Ensight Constant".format( ens_routine ) @@ -891,8 +919,8 @@ def _press_force_xyz_rtz( ensvar_values: List["ENS_VAR"] ensvar_values = [v for v in values] _press_var_obj = ensvar_values[0] - if not pobj_list: - raise RuntimeError("Error, no part provided") + if not pobj_list: # pragma: no cover + raise RuntimeError("Error, no part provided") # pragma: no cover # # select all parts in list # @@ -920,20 +948,20 @@ def _press_force_xyz_rtz( calc_string = "(plist," + press_var_name + ")" force_calc_string = "ENS_Force_press = Force" temp_string = force_calc_string + calc_string - if not self._calc_var(_pobj_list, temp_string): - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_X = ENS_Force_press[X]" - if not self._calc_var(_pobj_list, temp_string): - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_Y = ENS_Force_press[Y]" - if not self._calc_var(_pobj_list, temp_string): - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_Z = ENS_Force_press[Z]" - if not self._calc_var(_pobj_list, temp_string): - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover # # RTZ Cylindrical force # @@ -941,20 +969,20 @@ def _press_force_xyz_rtz( temp_string = ( "ENS_Force_press_cyl = RectToCyl(plist,ENS_Force_press," + str(frame_index) + ")" ) - if not self._calc_var(_pobj_list, temp_string): - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_R = ENS_Force_press_cyl[X]" - if not self._calc_var(_pobj_list, temp_string): # radial force - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_T = ENS_Force_press_cyl[Y]" - if not self._calc_var(pobj_list, temp_string): # angular force - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover temp_string = "ENS_Force_press_A = ENS_Force_press_cyl[Z]" - if not self._calc_var(_pobj_list, temp_string): # axial force - raise RuntimeError("Error calculating: '" + temp_string + "'") + if not self._calc_var(_pobj_list, temp_string): # pragma: no cover + raise RuntimeError("Error calculating: '" + temp_string + "'") # pragma: no cover return True def _sum_pressure_forces_xyz_rtz( @@ -985,8 +1013,8 @@ def _sum_pressure_forces_xyz_rtz( # if not frame_index: frame_index = 0 - if not pobj_list: - raise RuntimeError("Error, no part provided") + if not pobj_list: # pragma: no cover + raise RuntimeError("Error, no part provided") # pragma: no cover # # Select the part(s) in the list # ensight.variables.evaluate("ENS_Force_Net_press_Y = StatMoment(plist,pressure,0,Compute_Per_part)") @@ -997,20 +1025,20 @@ def _sum_pressure_forces_xyz_rtz( force_calc_string = "ENS_Force_Net_press_X = StatMoment" calc_string = "(plist," + "ENS_Force_press_X , 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Net_press_Y = StatMoment" calc_string = "(plist," + "ENS_Force_press_Y , 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Net_press_Z = StatMoment" calc_string = "(plist," + "ENS_Force_press_Z , 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # # Calculate the Total force X, Y, and Z , 10.2.0(d) now case constant variable # Totals are a case constants. We don't do anything with these vars @@ -1019,20 +1047,20 @@ def _sum_pressure_forces_xyz_rtz( force_calc_string = "ENS_Force_Total_Net_press_X = StatMoment" calc_string = "(plist," + "ENS_Force_press_X , 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Total_Net_press_Y = StatMoment" calc_string = "(plist," + "ENS_Force_press_Y , 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Total_Net_press_Z = StatMoment" calc_string = "(plist," + "ENS_Force_press_Z , 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # # get a list with a per part force, one for each part, new 10.1.6(b) # @@ -1088,20 +1116,20 @@ def _sum_pressure_forces_xyz_rtz( force_calc_string = "ENS_Force_Net_press_R = StatMoment" calc_string = "(plist," + "ENS_Force_press_R, 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Net_press_T = StatMoment" calc_string = "(plist," + "ENS_Force_press_T, 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Net_press_A = StatMoment" calc_string = "(plist," + "ENS_Force_press_A, 0, Compute_Per_part )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # # Totals are a case constants. We don't do anything with these vars # they are calc'd to give the user the totals. @@ -1109,20 +1137,20 @@ def _sum_pressure_forces_xyz_rtz( force_calc_string = "ENS_Force_Total_Net_press_R = StatMoment" calc_string = "(plist," + "ENS_Force_press_R, 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Total_Net_press_T = StatMoment" calc_string = "(plist," + "ENS_Force_press_T, 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # force_calc_string = "ENS_Force_Total_Net_press_A = StatMoment" calc_string = "(plist," + "ENS_Force_press_A, 0, Compute_Per_case )" temp_string = force_calc_string + calc_string - if not self._calc_var(pobj_list, temp_string): - return None + if not self._calc_var(pobj_list, temp_string): # pragma: no cover + return None # pragma: no cover # # get a list with a per part force, one for each part, new 10.1.6(b) # @@ -1169,18 +1197,16 @@ def _sum_pressure_forces_xyz_rtz( ret_val.append([Fx[ii], Fy[ii], Fz[ii], Fr[ii], Ft[ii], Fa[ii]]) return ret_val else: - err_string = "Error getting XYZ and/or Cylindrical RTZ Pressure Net Force per part constant values" - raise RuntimeError(err_string) + err_string = "Error getting XYZ and/or Cylindrical RTZ Pressure Net Force per part constant values" # pragma: no cover + raise RuntimeError(err_string) # pragma: no cover else: # either only one Frame or Frame 0 has been chosen so no cylindrical calc if all([Fx, Fy, Fz]): ret_val = [] for ii in range(len(pobj_list)): ret_val.append([Fx[ii], Fy[ii], Fz[ii], 0.0, 0.0, 0.0]) return ret_val - else: - err_string = ( - "Error getting Fx, Fy, and/or Fz Pressure Net force per part constant values" - ) + else: # pragma: no cover + err_string = "Error getting Fx, Fy, and/or Fz Pressure Net force per part constant values" # pragma: no cover # pragma: no cover raise RuntimeError(err_string) def _write_out_force_data( @@ -1515,11 +1541,11 @@ def _write_out_force_data( # FIX ME keep track of and write out totals here when loop is done on last line? fp.close() return True - except IOError: - raise RuntimeError( + except IOError: # pragma: no cover + raise RuntimeError( # pragma: no cover "Error Failed to open output csv filename for writing '" + filename + "'" - ) - raise RuntimeError("Error no pressure force list to write out") + ) # pragma: no cover + raise RuntimeError("Error no pressure force list to write out") # pragma: no cover @staticmethod def _force_coeffs( @@ -1580,8 +1606,8 @@ def _get_up_vec(up_str: str) -> np.array: up_vec = [0.0, -1.0, 0.0] elif up_str == "-Z": up_vec = [0.0, 0.0, -1.0] - else: - raise RuntimeError( + else: # pragma: no cover + raise RuntimeError( # pragma: no cover f"Up vector {up_str} not allowed. It can only be +X, +Y, +Z, -X, -Y, -Z." ) return np.array(up_vec) diff --git a/tests/example_tests/test_coverage_increase.py b/tests/example_tests/test_coverage_increase.py index cb62e067b33..c0d830e846c 100644 --- a/tests/example_tests/test_coverage_increase.py +++ b/tests/example_tests/test_coverage_increase.py @@ -1,7 +1,7 @@ import os import pathlib -from ansys.pyensight.core import DockerLauncher, LocalLauncher +from ansys.pyensight.core import DockerLauncher, LocalLauncher, launch_ensight import pytest @@ -60,6 +60,19 @@ def test_coverage_increase(tmpdir, pytestconfig: pytest.Config): session.jupyter_notebook = False assert session.sos is False session.load_example("waterbreak.ens", root=root) + session.ensight.utils.variables._check_for_var_elem( + "alpha1", session.ensight.objs.core.PARTS["default_region"[0]] + ) + session.ensight.utils.variables._move_var_to_elem( + session.ensight.objs.core.PARTS, session.ensight.objs.core.VARIABLES["alpha1"][0] + ) + session.ensight.utils.variables._calc_var(None, None) + try: + session.ensight.utils.variables._calc_var( + session.ensight.objs.core.PARTS["default_region"[0]], "test" + ) + except RuntimeError: + pass core = session.ensight.objs.core core.PARTS.set_attr("COLORBYPALETTE", "alpha1") export = session.ensight.utils.export @@ -253,3 +266,128 @@ def test_coverage_increase(tmpdir, pytestconfig: pytest.Config): assert False except Exception: pass + assert session.grpc.port() == session._grpc_port + assert session.grpc.host == session._hostname + assert session.grpc.security_token == session._secret_key + session.close() + + +def test_particle_traces_and_geometry(tmpdir, pytestconfig: pytest.Config): + data_dir = tmpdir.mkdir("datadir") + use_local = pytestconfig.getoption("use_local_launcher") + root = None + if use_local: + launcher = LocalLauncher(enable_rest_api=True) + root = "http://s3.amazonaws.com/www3.ensight.com/PyEnSight/ExampleData" + else: + launcher = DockerLauncher(data_directory=data_dir, use_dev=True, enable_rest_api=True) + session = launcher.start() + session.load_example("waterbreak.ens", root=root) + parts = session.ensight.utils.parts + export = session.ensight.utils.export + session.ensight.objs.core.PARTS.set_attr("COLORBYPALETTE", "alpha1") + export.geometry("test.glb", format=export.GEOM_EXPORT_GLTF, starting_timestep=0) + export.geometry( + "second_test.glb", format=export.GEOM_EXPORT_GLTF, starting_timestep=0, frames=-1 + ) + assert os.path.exists("test.glb") + for i in range(20): + assert os.path.exists(f"second_test{str(i).zfill(3)}.glb") + point_pt = parts.create_particle_trace_from_points( + "test", "u", points=[[0.01, 0.1, 0]], source_parts=parts.select_parts_by_dimension(3) + ) + line_pt = parts.create_particle_trace_from_line( + "test2", + "u", + point1=[0.01, 0.1, -0.02], + point2=[0.01, 0.1, 0.02], + source_parts=parts.select_parts_by_dimension(3), + pathlines=True, + emit_time=0.0, + total_time=1.0, + delta_time=0.025, + num_points=3, + ) + plane_pt = parts.create_particle_trace_from_plane( + "test3", + 5, + direction=parts.PT_NEG_TIME, + source_parts=parts.select_parts_by_dimension(3), + point1=[0.5, 0.2, 0.013], + point2=[0.5, 0.2, -0.002], + point3=[0.5, 0.35, -0.002], + num_points_x=4, + num_points_y=5, + ) + part_pt = parts.create_particle_trace_from_parts( + "test4", + "u", + source_parts=parts.select_parts_by_dimension(3), + parts=["leftWall"], + num_points=10, + ) + parts.create_particle_trace_from_parts( + "test10", + "u", + source_parts=parts.select_parts_by_dimension(3), + parts=["leftWall"], + num_points=10, + surface_restrict=True, + ) + parts.add_emitter_parts_to_particle_trace_part( + part_pt, parts=["lowerWall"], num_points=5, part_distribution_type=parts.PART_EMIT_FROM_AREA + ) + parts.add_emitter_points_to_particle_trace_part( + point_pt, + points=[[0.02, 0.1, 0]], + ) + parts.add_emitter_line_to_particle_trace_part( + line_pt, point1=[0.02, 0.1, -0.02], point2=[0.02, 0.1, 0.02], num_points=3 + ) + parts.add_emitter_plane_to_particle_trace_part( + plane_pt, + point1=[0.3, 0.2, 0.013], + point2=[0.3, 0.2, -0.002], + point3=[0.3, 0.35, -0.002], + num_points_x=4, + num_points_y=5, + ) + + +def test_sos(tmpdir, pytestconfig: pytest.Config): + data_dir = tmpdir.mkdir("datadir") + use_local = pytestconfig.getoption("use_local_launcher") + is_docker = False + if use_local: + launcher = LocalLauncher(use_sos=2) + else: + is_docker = True + launcher = DockerLauncher(data_directory=data_dir, use_dev=True, use_sos=2) + session = launcher.start() + session.load_data(f"{session.cei_home}/ensight{session.cei_suffix}/data/cube/cube.case") + assert session.grpc.port() == session._grpc_port + assert session.grpc.host == session._hostname + assert session.grpc.security_token == session._secret_key + session.close() + if is_docker: + session = launch_ensight(use_docker=True, use_dev=True, data_directory=data_dir) + assert session._launcher._enshell.host() == session._hostname + session._launcher._enshell.port() + session._launcher._enshell.security_token() + session._launcher._enshell.metadata() + _parts = session.ensight.objs.core.PARTS + session.ensight.utils.parts.get_part_id_obj_name(_parts, "id") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "name") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "obj") + _parts = [p.ID for p in session.ensight.objs.core.PARTS] + session.ensight.utils.parts.get_part_id_obj_name(_parts, "id") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "name") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "obj") + _parts = [f"{p.ID}" for p in session.ensight.objs.core.PARTS] + session.ensight.utils.parts.get_part_id_obj_name(_parts, "id") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "name") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "obj") + _parts = [p.DESCRIPTION for p in session.ensight.objs.core.PARTS] + session.ensight.utils.parts.get_part_id_obj_name(_parts, "id") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "name") + session.ensight.utils.parts.get_part_id_obj_name(_parts, "obj") diff --git a/tests/example_tests/test_force_tool.py b/tests/example_tests/test_force_tool.py index 1b30b2d9c72..3bc71027b0a 100644 --- a/tests/example_tests/test_force_tool.py +++ b/tests/example_tests/test_force_tool.py @@ -51,7 +51,6 @@ def test_force_tool(tmpdir, pytestconfig: pytest.Config): launcher = DockerLauncher(data_directory=data_dir, use_dev=True) session = launcher.start() path = session.download_pyansys_example("RC_Plane", "pyensight", folder=True) - print(path) session.load_data(os.path.join(path, "extra300_RC_Plane_cpp.case")) create_frame(session.ensight) body_parts = [ @@ -68,6 +67,7 @@ def test_force_tool(tmpdir, pytestconfig: pytest.Config): vref = session.ensight.utils.variables.get_const_val("Vinf") dref = session.ensight.utils.variables.get_const_val("Rho") aoa = session.ensight.utils.variables.get_const_val("AoA") + session.ensight.utils.variables.get_const_val(session.ensight.objs.core.VARIABLES["AoA"][0]) vxref = vref * math.cos(aoa * math.pi / 180) vyref = vref * math.sin(aoa * math.pi / 180) vzref = 0 @@ -91,4 +91,32 @@ def test_force_tool(tmpdir, pytestconfig: pytest.Config): up_vector=session.ensight.utils.variables.UP_VECTOR_PLUS_Y, frame_index=1, ) + session.ensight.utils.variables.compute_forces( + pobj_list=body_parts.copy(), + press_var_obj="staticPressure", + shear_var_obj="wallShearStress", + shear_var_type=session.ensight.utils.variables.SHEAR_VAR_TYPE_STRESS, + export_filename="test.csv", + area_ref=area_ref, + density_ref=dref, + velocity_x_ref=vxref, + velocity_y_ref=vyref, + velocity_z_ref=vzref, + up_vector=session.ensight.utils.variables.UP_VECTOR_PLUS_Y, + frame_index=1, + ) + session.ensight.utils.variables.compute_forces( + pobj_list=body_parts.copy(), + press_var_obj="staticPressure", + shear_var_obj="wallShearStress", + shear_var_type=session.ensight.utils.variables.SHEAR_VAR_TYPE_STRESS, + export_filename="test2.csv", + area_ref=area_ref, + density_ref=dref, + velocity_x_ref=vxref, + velocity_y_ref=vyref, + velocity_z_ref=vzref, + up_vector=session.ensight.utils.variables.UP_VECTOR_PLUS_Y, + ) assert os.path.exists("test.csv") + assert os.path.exists("test2.csv")