diff --git a/src/ansys/pyensight/core/ensight_grpc.py b/src/ansys/pyensight/core/ensight_grpc.py index 855770668c..dce5efb492 100644 --- a/src/ansys/pyensight/core/ensight_grpc.py +++ b/src/ansys/pyensight/core/ensight_grpc.py @@ -4,6 +4,11 @@ interface to the EnSight gRPC interface, including event streams. """ +from concurrent import futures +import os +import platform +import sys +import tempfile import threading from typing import Any, Callable, List, Optional, Tuple, Union import uuid @@ -47,6 +52,14 @@ def __init__(self, host: str = "127.0.0.1", port: int = 12345, secret_key: str = # Callback for events (self._events not used) self._event_callback: Optional[Callable] = None self._prefix: Optional[str] = None + self._shmem_module = None + self._shmem_filename: Optional[str] = None + self._shmem_client = None + self._image_stream = None + self._image_thread = None + self._image = None + self._image_number = 0 + self._sub_service = None @property def host(self) -> str: @@ -120,6 +133,12 @@ def shutdown(self, stop_ensight: bool = False, force: bool = False) -> None: if self._channel: self._channel.close() self._channel = None + if self._shmem_client: + if self._shmem_module: + self._shmem_module.stream_destroy(self._shmem_client) + else: + self.command("ensight_grpc_shmem.stream_destroy(enscl._shmem_client)") + self._shmem_client = None def is_connected(self) -> bool: """Check to see if the gRPC connection is live @@ -430,3 +449,286 @@ def _poll_events(self) -> None: # signal that the gRPC connection has broken self._event_stream = None self._event_thread = None + + def _attempt_shared_mem_import(self): + try: + import ensight_grpc_shmem + + self._shmem_module = ensight_grpc_shmem + except ModuleNotFoundError: + try: + self.command("import enve", do_eval=False) + cei_home = eval(self.command("enve.home()")) + self.command("import ceiversion", do_eval=False) + cei_version = eval(self.command("ceiversion.version_suffix")) + self.command("import sys", do_eval=False) + py_version = eval(self.command("sys.version_info[:3]")) + is_win = True if "Win" in platform.system() else False + plat = "win64" if is_win else "linux_2.6_64" + _lib = "DLLs" if is_win else f"lib/python{py_version[0]}.{py_version[1]}" + dll_loc = os.path.join( + cei_home, + f"apex{cei_version}", + "machines", + plat, + f"Python-{py_version[0]}.{py_version[1]}.{py_version[2]}", + _lib, + ) + if os.path.exists(dll_loc): + sys.path.append(dll_loc) + import ensight_grpc_shmem + + self._shmem_module = ensight_grpc_shmem + except ModuleNotFoundError: + pass + + @classmethod + def _find_filename(cls, size=1024 * 1024 * 25): + """Create a file on disk to support shared memory transport. + + A file, 25MB in size, will be created using the pid of the current + process to generate the filename. It will be located in a temporary + directory. + """ + tempdir = tempfile.mkdtemp(prefix="pyensight_shmem") + for i in range(100): + filename = os.path.join(tempdir, "shmem_{}.bin".format(os.getpid() + i)) + if not os.path.exists(filename): + try: + tmp = open(filename, "wb") + tmp.write(b"\0" * size) # 25MB + tmp.close() + return filename + except Exception: + pass + return None + + def get_image(self): + """Retrieve the current EnSight image. + + When any of the image streaming systems is enabled, Python threads will receive the + most recent image and store them in this instance. The frame stored in this instance + can be accessed by calling this method + + Returns + ------- + (tuple): + A tuple containing a dictionary defining the image binary + (pixels=bytearray, width=w, height=h) and the image frame number. + """ + return self._image, self._image_number + + def _start_sub_service(self): + """Start a gRPC client service. + When the client calls one subscribe_events() or subscribe_images() with the + connection set to GRPC, the interface requires the client to start a gRPC server + that EnSight will call back to with event/image messages. This method starts + such a gRPC server.""" + try: + if self._sub_service is not None: + return + self._sub_service = _EnSightSubServicer(parent=self) + self._sub_service.start() + except Exception: + self._sub_service = None + + def subscribe_images(self, flip_vertical=False, use_shmem=True): + """Subscribe to an image stream. + + This methond makes a EnSightService::SubscribeImages() gRPC call. If + use_shmem is False, the transport system will be made over gRPC. It causes + EnSight to make a reverse gRPC connection over with gRPC calls with the + various images will be made. If use_shmem is True (the default), the \ref shmem will be used. + + Parameters + --------- + flip_vertical: bool + If True, the image pixels will be flipped over the X axis + use_shmem: bool + If True, use the shared memory transport, otherwise use reverse gRPC""" + self.connect() + if use_shmem: + try: + # we need a shared memory file + self._shmem_filename = self._find_filename() + if self._shmem_filename is not None: + conn_type = ensight_pb2.SubscribeImageOptions.SHARED_MEM + options = dict(filename=self._shmem_filename) + image_options = ensight_pb2.SubscribeImageOptions( + prefix=self.prefix(), + type=conn_type, + options=options, + flip_vertical=flip_vertical, + chunk=False, + ) + _ = self._stub.SubscribeImages(image_options, metadata=self._metadata()) + # start the local server + if not self._shmem_module: + self._attempt_shared_mem_import() + if self._shmem_module: + self._shmem_client = self._shmem_module.stream_create(self._shmem_filename) + else: + self.command("import ensight_grpc_shmem", do_eval=False) + to_send = self._shmem_filename.replace("\\", "\\\\") + self.command( + f"enscl._shmem_client = ensight_grpc_shmem.stream_create('{to_send}')", + do_eval=False, + ) + if self.command("enscl._shmem_client is not None"): + self._shmem_client = True + + # turn on the polling thread + self._image_thread = threading.Thread(target=self._poll_images) + self._image_thread.daemon = True + self._image_thread.start() + return + except Exception as e: + print("Unable to subscribe to an image stream via shared memory: {}".format(str(e))) + + self._start_sub_service() + conn_type = ensight_pb2.SubscribeImageOptions.GRPC + options = {} + if self._sub_service: + options = dict(uri=self._sub_service._uri) + image_options = ensight_pb2.SubscribeImageOptions( + prefix=self.prefix(), + type=conn_type, + options=options, + flip_vertical=flip_vertical, + chunk=True, + ) + _ = self._stub.SubscribeImages(image_options, metadata=self._metadata()) + + def image_stream_enable(self, flip_vertical=False): + """Enable a simple gRPC-based image stream from EnSight. + + This method makes a EnSightService::GetImageStream() gRPC call into EnSight, returning + an ensightservice::ImageReply stream. The method creates a thread to hold this + stream open and read new image frames from it. The thread places the read images + in this object. An external application can retrieve the most recent one using + get_image(). + + Parameters + ---------- + flip_vertical: bool + If True, the image will be flipped over the X axis before being sent from EnSight.""" + if self._image_stream is not None: + return + self.connect() + self._image_stream = self._stub.GetImageStream( + ensight_pb2.ImageStreamRequest(flip_vertical=flip_vertical, chunk=True), + metadata=self._metadata(), + ) + self._image_thread = threading.Thread(target=self._poll_images) + self._image_thread.daemon = True + self._image_thread.start() + + def _put_image(self, the_image): + """Store an image on this instance. + + This method is used by threads to store the latest image they receive + so it can be accessed by get_image. + """ + self._image = the_image + self._image_number += 1 + + def image_stream_is_enabled(self): + """Check to see if the image stream is enabled. + + If an image stream has been successfully established via image_stream_enable(), + then this function returns True. + + Returns + ------- + (bool): + True if a ensightservice::ImageReply steam is active + """ + return self._image_stream is not None + + def _poll_images(self): + """Handle image streams. + + This method is called by a Python thread to read imagery via the shared memory + transport system or the the ensightservice::ImageReply stream. + """ + try: + while self._stub is not None: + if self._shmem_client: + if self._shmem_module: + img = self._shmem_module.stream_lock(self._shmem_client) + else: + img = self.command("ensight_grpc_shmem.stream_lock(enscl._shmem_client)") + if type(img) is dict: + the_image = dict( + pixels=img["pixeldata"], width=img["width"], height=img["height"] + ) + self._put_image(the_image) + if self._shmem_module: + self._shmem_module.stream_unlock(self._shmem_client) + else: + self.command( + "ensight_grpc_shmem.stream_unlock(enscl._shmem_client)", + do_eval=False, + ) + + if self._image_stream is not None: + img = self._image_stream.next() + buffer = img.pixels + + while not img.final: + img = self._image_stream.next() + buffer += img.pixels + + the_image = dict(pixels=buffer, width=img.width, height=img.height) + self._put_image(the_image) + except Exception: + # signal that the gRPC connection has broken + self._image_stream = None + self._image_thread = None + self._image = None + + +class _EnSightSubServicer(ensight_pb2_grpc.EnSightSubscriptionServicer): + """Internal class handling reverse subscription connections. + The EnSight gRPC interface has a mechanism for reversing the gRPC + streams called Subscriptions. Image and event streams can be + subscribed to. In this mode, the client application starts a + gRPC server that implements the EnSightSubscription protocol. + EnSight will connect back to the client using this protocol and + send images/events back to the client as regular (non-stream) + rpc calls. This can be useful in situations where it is difficult + keep a long-running stream alive. + The EnSightSubServicer class implements a gRPC server for the client application. + """ + + def __init__(self, parent: Optional["EnSightGRPC"] = None): + self._server: Optional["grpc.Server"] = None + self._uri: str = "" + self._parent = parent + + def PublishEvent(self, request: Any, context: Any) -> "ensight_pb2.GenericResponse": + """Publish an event to the remote server.""" + if self._parent is not None: + self._parent._put_event(request) + return ensight_pb2.GenericResponse(str="Event Published") + + def PublishImage(self, request_iterator: Any, context: Any) -> "ensight_pb2.GenericResponse": + """Publish a single image (possibly in chucks) to the remote server.""" + img: Any = request_iterator.next() + buffer = img.pixels + while not img.final: + img = request_iterator.next() + buffer += img.pixels + the_image = dict(pixels=buffer, width=img.width, height=img.height) + if self._parent is not None: + self._parent._put_image(the_image) + return ensight_pb2.GenericResponse(str="Image Published") + + def start(self): + """Start the gRPC server to be used for the EnSight Subscription Service.""" + self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + ensight_pb2_grpc.add_EnSightSubscriptionServicer_to_server(self, self._server) + # Start the server on localhost with a random port + port = self._server.add_insecure_port("localhost:0") + self._uri = "localhost:" + str(port) + self._server.start()