Skip to content

Commit

Permalink
feat: idsse-897 rabbitmq_utils Rpc (#87)
Browse files Browse the repository at this point in the history
* add rpc class to rabbitmq_utils
* add protections for Publisher/Consumer using default exch or queue
* bug fix: stop tracking Future for timed out/errored RPC requests
* rebuild Rpc unit tests for error cases
* Setting prefetch_count must be done before calling basic_consume.
* remove duplicative RpcResponse data class

---------

Co-authored-by: Geary Layne <[email protected]>
  • Loading branch information
mackenzie-grimes-noaa and Geary-Layne authored Dec 6, 2024
1 parent 52172d7 commit 46fce65
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 62 deletions.
159 changes: 146 additions & 13 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging.config
import uuid

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from collections.abc import Callable
from functools import partial
from threading import Event, Thread
Expand Down Expand Up @@ -91,9 +91,9 @@ class RabbitMqParamsAndCallback(NamedTuple):
callback: Callable


class PublishMessageParams(NamedTuple):
"""Data class to hold a RabbitMQ message, its properties, and optional route_key"""
message: str
class RabbitMqMessage(NamedTuple):
"""Data class to hold a RabbitMQ message body, properties, and optional route_key (if outbound)"""
body: str
properties: BasicProperties
route_key: str | None = None

Expand Down Expand Up @@ -248,7 +248,7 @@ def publish(self, message: bytes, properties: BasicProperties = None, route_key:
self.channel,
lambda: _publish(self.channel,
self._exch,
PublishMessageParams(message, properties, route_key),
RabbitMqMessage(message, properties, route_key),
self._queue)
)

Expand All @@ -272,7 +272,7 @@ def blocking_publish(self,
"""
return _blocking_publish(self.channel,
self._exch,
PublishMessageParams(message, properties, route_key),
RabbitMqMessage(message, properties, route_key),
self._queue)

def stop(self):
Expand All @@ -287,6 +287,139 @@ def stop(self):
self.connection.close)


class Rpc:
"""
RabbitMQ RPC (remote procedure call) client, runs in own thread to not block heartbeat.
The start() and stop() methods should be called from the same thread that created the instance.
This RPC class can be used to send "requests" (outbound messages) over RabbitMQ and block until
a "response" (inbound message) comes back from the receiving app. All producing to/consuming of
different queues, and associating requests with their responses, is abstracted away.
Note that RPC by RabbitMQ convention uses the built-in Direct Reply-To queue to field the
responses messages, rather than creating its own queue. Directing responses to a custom queue
is not yet supported by Rpc.
Example usage:
my_client = RpcClient(...insert params here...)
response = my_client.send_message('{"some": "json"}') # blocks while waiting for response
logger.info(f'Response from external service: {response}')
"""
def __init__(self, conn_params: Conn, exch: Exch, timeout: float | None = None):
"""
Args:
conn_params (Conn): parameters to connect to RabbitMQ server
exch (Exch): parameters of RMQ Exchange where messages should be sent
timeout (float | None): optional timeout to give up on receiving each response.
Default is None, meaning wait indefinitely for response from external RMQ service.
"""
self._exch = exch
self._timeout = timeout
# only publish to built-in Direct Reply-to queue (recommended for RPC, less setup needed)
self._queue = Queue(DIRECT_REPLY_QUEUE, '', True, False, False)

# worklist to track corr_ids sent to remote service, and associated response when it arrives
self._pending_requests: dict[str, Future] = {}

# Start long-running thread to consume any messages from response queue
self.consumer = Consumer(
conn_params,
RabbitMqParamsAndCallback(RabbitMqParams(Exch('', 'direct'), self._queue),
self._response_callback)
)

@property
def is_open(self) -> bool:
"""Returns True if RabbitMQ connection (Publisher) is open and ready to send messages"""
return self.consumer.is_alive() and self.consumer.channel.is_open

def send_request(self, request_body: str | bytes) -> RabbitMqMessage | None:
"""Send message to remote RabbitMQ service using thread-safe RPC. Will block until response
is received back, or timeout occurs.
Returns:
RabbitMqMessage | None: The response message (body and properties), or None on request
timeout or error handling response.
"""
if not self.is_open:
logger.debug('RPC thread not yet initialized. Setting up now')
self.start()

# generate unique ID to associate our request to external service's response
request_id = str(uuid.uuid4())

# send request to external RMQ service, providing the queue where it should respond
properties = BasicProperties(content_type='application/json',
correlation_id=request_id,
reply_to=self._queue.name)

# add future to dict where callback can retrieve it and set result
request_future = Future()
self._pending_requests[request_id] = request_future

logger.debug('Publishing request message to external service with body: %s', request_body)
_blocking_publish(self.consumer.channel,
self._exch,
RabbitMqMessage(request_body, properties, self._exch.route_key),
self._queue)

try:
# block until callback runs (we'll know when the future's result has been changed)
return request_future.result(timeout=self._timeout)
except TimeoutError:
logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
self._pending_requests.pop(request_id) # stop tracking request Future
return None
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning('Unexpected response from external service: %s', str(exc))
self._pending_requests.pop(request_id) # stop tracking request Future
return None

def start(self):
"""Start dedicated threads to asynchronously send and receive RPC messages using a new
RabbitMQ connection and channel. Note: this method can be called externally, but it is
not required to use the client. It will automatically call this internally as needed."""
if not self.is_open:
logger.debug('Starting RPC thread to send and consume messages')
self.consumer.start()

def stop(self):
"""Unsubscribe to Direct Reply-To queue and cleanup thread"""
logger.debug('Shutting down RPC threads')
if not self.is_open:
logger.debug('RPC threads not running, nothing to cleanup')
return

# tell Consumer cleanup RabbitMQ resources and wait for thread to terminate
self.consumer.stop()
self.consumer.join()

def _response_callback(
self,
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes
):
"""Handle RabbitMQ message emitted to response queue."""
logger.debug('Received response message with routing_key: %s, content_type: %s, message: %i',
method.routing_key, properties.content_type, str(body, encoding='utf-8'))

# remove future from pending list. we will update result shortly
request_future = self._pending_requests.pop(properties.correlation_id)

# messages sent through RabbitMQ Direct reply-to are auto acked
is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE)
if not is_direct_reply:
channel.basic_ack(delivery_tag=method.delivery_tag)

# update future with response body to communicate it back up to main thread
return request_future.set_result(RabbitMqMessage(str(body, encoding='utf-8'), properties))


def subscribe_to_queue(
connection: Conn | BlockingConnection,
rmq_params: RabbitMqParams,
Expand Down Expand Up @@ -552,7 +685,7 @@ def _setup_exch(channel: Channel, exch: Exch):
# pylint: disable=too-many-arguments
def _publish(channel: BlockingChannel,
exch: Exch,
message_params: PublishMessageParams,
message_params: RabbitMqMessage,
queue: Queue | None = None,
success_flag: list[bool] = None,
done_event: Event = None):
Expand All @@ -563,8 +696,8 @@ def _publish(channel: BlockingChannel,
channel (BlockingChannel): the pika channel to use to publish.
exch (Exch): parameters for the RabbitMQ exchange to publish message to.
message_params (PublishMessageParams): the message body to publish, plus properties
and (optional) route_key.
message_params (RabbitMqMessage): the message body to publish, plus properties and
(optional) route_key.
queue (optional, Queue | None): parameters for RabbitMQ queue, if message is being
published to a "temporary"/"private" message queue. The published message will be
purged from this queue after its TTL expires.
Expand All @@ -582,7 +715,7 @@ def _publish(channel: BlockingChannel,
channel.basic_publish(
exch.name,
message_params.route_key if message_params.route_key else exch.route_key,
body=message_params.message,
body=message_params.body,
properties=message_params.properties,
mandatory=exch.mandatory
)
Expand All @@ -606,7 +739,7 @@ def _publish(channel: BlockingChannel,
def _blocking_publish(
channel: BlockingChannel,
exch: Exch,
message_params: PublishMessageParams,
message_params: RabbitMqMessage,
queue: Queue | None = None,
) -> bool:
"""
Expand All @@ -616,8 +749,8 @@ def _blocking_publish(
Args:
channel (BlockingChannel): the pika channel to use to publish.
exch (Exch): parameters for the RabbitMQ exchange to publish message to.
message_params (PublishMessageParams): the message body to publish, plus properties
and (optional) route_key
message_params (RabbitMqMessage): the message body to publish, plus properties and
(optional) route_key
queue (optional, Queue | None): parameters for RabbitMQ queue, if message is being
published to a "temporary"/"private" message queue. The published message will be
purged from this queue after its TTL expires.
Expand Down
2 changes: 0 additions & 2 deletions python/idsse_common/idsse/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def exec_cmd(commands: Sequence[str], timeout: int | None = None) -> Sequence[st

def to_iso(date_time: datetime) -> str:
"""Format a datetime instance to an ISO string"""
logger.debug('Datetime (%s) to iso', datetime)
return (f'{date_time.strftime("%Y-%m-%dT%H:%M")}:'
f'{(date_time.second + date_time.microsecond / 1e6):06.3f}'
'Z' if date_time.tzname() in [None, str(timezone.utc)]
Expand All @@ -126,7 +125,6 @@ def to_iso(date_time: datetime) -> str:

def to_compact(date_time: datetime) -> str:
"""Format a datetime instance to a compact string"""
logger.debug('Datetime (%s) to compact -- %s', datetime, __name__)
return date_time.strftime('%Y%m%d%H%M%S')


Expand Down
Loading

0 comments on commit 46fce65

Please sign in to comment.