Skip to content

Commit

Permalink
Mostieri/shared memory interface (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariostieriansys authored Nov 25, 2024
1 parent 9598e72 commit 7956fa8
Showing 1 changed file with 302 additions and 0 deletions.
302 changes: 302 additions & 0 deletions src/ansys/pyensight/core/ensight_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
interface to the EnSight gRPC interface, including event streams.
"""
from concurrent import futures
import os
import platform
import sys
import tempfile
import threading
from typing import Any, Callable, List, Optional, Tuple, Union
import uuid
Expand Down Expand Up @@ -47,6 +52,14 @@ def __init__(self, host: str = "127.0.0.1", port: int = 12345, secret_key: str =
# Callback for events (self._events not used)
self._event_callback: Optional[Callable] = None
self._prefix: Optional[str] = None
self._shmem_module = None
self._shmem_filename: Optional[str] = None
self._shmem_client = None
self._image_stream = None
self._image_thread = None
self._image = None
self._image_number = 0
self._sub_service = None

@property
def host(self) -> str:
Expand Down Expand Up @@ -120,6 +133,12 @@ def shutdown(self, stop_ensight: bool = False, force: bool = False) -> None:
if self._channel:
self._channel.close()
self._channel = None
if self._shmem_client:
if self._shmem_module:
self._shmem_module.stream_destroy(self._shmem_client)
else:
self.command("ensight_grpc_shmem.stream_destroy(enscl._shmem_client)")
self._shmem_client = None

def is_connected(self) -> bool:
"""Check to see if the gRPC connection is live
Expand Down Expand Up @@ -430,3 +449,286 @@ def _poll_events(self) -> None:
# signal that the gRPC connection has broken
self._event_stream = None
self._event_thread = None

def _attempt_shared_mem_import(self):
try:
import ensight_grpc_shmem

self._shmem_module = ensight_grpc_shmem
except ModuleNotFoundError:
try:
self.command("import enve", do_eval=False)
cei_home = eval(self.command("enve.home()"))
self.command("import ceiversion", do_eval=False)
cei_version = eval(self.command("ceiversion.version_suffix"))
self.command("import sys", do_eval=False)
py_version = eval(self.command("sys.version_info[:3]"))
is_win = True if "Win" in platform.system() else False
plat = "win64" if is_win else "linux_2.6_64"
_lib = "DLLs" if is_win else f"lib/python{py_version[0]}.{py_version[1]}"
dll_loc = os.path.join(
cei_home,
f"apex{cei_version}",
"machines",
plat,
f"Python-{py_version[0]}.{py_version[1]}.{py_version[2]}",
_lib,
)
if os.path.exists(dll_loc):
sys.path.append(dll_loc)
import ensight_grpc_shmem

self._shmem_module = ensight_grpc_shmem
except ModuleNotFoundError:
pass

@classmethod
def _find_filename(cls, size=1024 * 1024 * 25):
"""Create a file on disk to support shared memory transport.
A file, 25MB in size, will be created using the pid of the current
process to generate the filename. It will be located in a temporary
directory.
"""
tempdir = tempfile.mkdtemp(prefix="pyensight_shmem")
for i in range(100):
filename = os.path.join(tempdir, "shmem_{}.bin".format(os.getpid() + i))
if not os.path.exists(filename):
try:
tmp = open(filename, "wb")
tmp.write(b"\0" * size) # 25MB
tmp.close()
return filename
except Exception:
pass
return None

def get_image(self):
"""Retrieve the current EnSight image.
When any of the image streaming systems is enabled, Python threads will receive the
most recent image and store them in this instance. The frame stored in this instance
can be accessed by calling this method
Returns
-------
(tuple):
A tuple containing a dictionary defining the image binary
(pixels=bytearray, width=w, height=h) and the image frame number.
"""
return self._image, self._image_number

def _start_sub_service(self):
"""Start a gRPC client service.
When the client calls one subscribe_events() or subscribe_images() with the
connection set to GRPC, the interface requires the client to start a gRPC server
that EnSight will call back to with event/image messages. This method starts
such a gRPC server."""
try:
if self._sub_service is not None:
return
self._sub_service = _EnSightSubServicer(parent=self)
self._sub_service.start()
except Exception:
self._sub_service = None

def subscribe_images(self, flip_vertical=False, use_shmem=True):
"""Subscribe to an image stream.
This methond makes a EnSightService::SubscribeImages() gRPC call. If
use_shmem is False, the transport system will be made over gRPC. It causes
EnSight to make a reverse gRPC connection over with gRPC calls with the
various images will be made. If use_shmem is True (the default), the \ref shmem will be used.
Parameters
---------
flip_vertical: bool
If True, the image pixels will be flipped over the X axis
use_shmem: bool
If True, use the shared memory transport, otherwise use reverse gRPC"""
self.connect()
if use_shmem:
try:
# we need a shared memory file
self._shmem_filename = self._find_filename()
if self._shmem_filename is not None:
conn_type = ensight_pb2.SubscribeImageOptions.SHARED_MEM
options = dict(filename=self._shmem_filename)
image_options = ensight_pb2.SubscribeImageOptions(
prefix=self.prefix(),
type=conn_type,
options=options,
flip_vertical=flip_vertical,
chunk=False,
)
_ = self._stub.SubscribeImages(image_options, metadata=self._metadata())
# start the local server
if not self._shmem_module:
self._attempt_shared_mem_import()
if self._shmem_module:
self._shmem_client = self._shmem_module.stream_create(self._shmem_filename)
else:
self.command("import ensight_grpc_shmem", do_eval=False)
to_send = self._shmem_filename.replace("\\", "\\\\")
self.command(
f"enscl._shmem_client = ensight_grpc_shmem.stream_create('{to_send}')",
do_eval=False,
)
if self.command("enscl._shmem_client is not None"):
self._shmem_client = True

# turn on the polling thread
self._image_thread = threading.Thread(target=self._poll_images)
self._image_thread.daemon = True
self._image_thread.start()
return
except Exception as e:
print("Unable to subscribe to an image stream via shared memory: {}".format(str(e)))

self._start_sub_service()
conn_type = ensight_pb2.SubscribeImageOptions.GRPC
options = {}
if self._sub_service:
options = dict(uri=self._sub_service._uri)
image_options = ensight_pb2.SubscribeImageOptions(
prefix=self.prefix(),
type=conn_type,
options=options,
flip_vertical=flip_vertical,
chunk=True,
)
_ = self._stub.SubscribeImages(image_options, metadata=self._metadata())

def image_stream_enable(self, flip_vertical=False):
"""Enable a simple gRPC-based image stream from EnSight.
This method makes a EnSightService::GetImageStream() gRPC call into EnSight, returning
an ensightservice::ImageReply stream. The method creates a thread to hold this
stream open and read new image frames from it. The thread places the read images
in this object. An external application can retrieve the most recent one using
get_image().
Parameters
----------
flip_vertical: bool
If True, the image will be flipped over the X axis before being sent from EnSight."""
if self._image_stream is not None:
return
self.connect()
self._image_stream = self._stub.GetImageStream(
ensight_pb2.ImageStreamRequest(flip_vertical=flip_vertical, chunk=True),
metadata=self._metadata(),
)
self._image_thread = threading.Thread(target=self._poll_images)
self._image_thread.daemon = True
self._image_thread.start()

def _put_image(self, the_image):
"""Store an image on this instance.
This method is used by threads to store the latest image they receive
so it can be accessed by get_image.
"""
self._image = the_image
self._image_number += 1

def image_stream_is_enabled(self):
"""Check to see if the image stream is enabled.
If an image stream has been successfully established via image_stream_enable(),
then this function returns True.
Returns
-------
(bool):
True if a ensightservice::ImageReply steam is active
"""
return self._image_stream is not None

def _poll_images(self):
"""Handle image streams.
This method is called by a Python thread to read imagery via the shared memory
transport system or the the ensightservice::ImageReply stream.
"""
try:
while self._stub is not None:
if self._shmem_client:
if self._shmem_module:
img = self._shmem_module.stream_lock(self._shmem_client)
else:
img = self.command("ensight_grpc_shmem.stream_lock(enscl._shmem_client)")
if type(img) is dict:
the_image = dict(
pixels=img["pixeldata"], width=img["width"], height=img["height"]
)
self._put_image(the_image)
if self._shmem_module:
self._shmem_module.stream_unlock(self._shmem_client)
else:
self.command(
"ensight_grpc_shmem.stream_unlock(enscl._shmem_client)",
do_eval=False,
)

if self._image_stream is not None:
img = self._image_stream.next()
buffer = img.pixels

while not img.final:
img = self._image_stream.next()
buffer += img.pixels

the_image = dict(pixels=buffer, width=img.width, height=img.height)
self._put_image(the_image)
except Exception:
# signal that the gRPC connection has broken
self._image_stream = None
self._image_thread = None
self._image = None


class _EnSightSubServicer(ensight_pb2_grpc.EnSightSubscriptionServicer):
"""Internal class handling reverse subscription connections.
The EnSight gRPC interface has a mechanism for reversing the gRPC
streams called Subscriptions. Image and event streams can be
subscribed to. In this mode, the client application starts a
gRPC server that implements the EnSightSubscription protocol.
EnSight will connect back to the client using this protocol and
send images/events back to the client as regular (non-stream)
rpc calls. This can be useful in situations where it is difficult
keep a long-running stream alive.
The EnSightSubServicer class implements a gRPC server for the client application.
"""

def __init__(self, parent: Optional["EnSightGRPC"] = None):
self._server: Optional["grpc.Server"] = None
self._uri: str = ""
self._parent = parent

def PublishEvent(self, request: Any, context: Any) -> "ensight_pb2.GenericResponse":
"""Publish an event to the remote server."""
if self._parent is not None:
self._parent._put_event(request)
return ensight_pb2.GenericResponse(str="Event Published")

def PublishImage(self, request_iterator: Any, context: Any) -> "ensight_pb2.GenericResponse":
"""Publish a single image (possibly in chucks) to the remote server."""
img: Any = request_iterator.next()
buffer = img.pixels
while not img.final:
img = request_iterator.next()
buffer += img.pixels
the_image = dict(pixels=buffer, width=img.width, height=img.height)
if self._parent is not None:
self._parent._put_image(the_image)
return ensight_pb2.GenericResponse(str="Image Published")

def start(self):
"""Start the gRPC server to be used for the EnSight Subscription Service."""
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
ensight_pb2_grpc.add_EnSightSubscriptionServicer_to_server(self, self._server)
# Start the server on localhost with a random port
port = self._server.add_insecure_port("localhost:0")
self._uri = "localhost:" + str(port)
self._server.start()

0 comments on commit 7956fa8

Please sign in to comment.