From 4e570ca13f9b53ebdc5018fa485f08e4130c65ff Mon Sep 17 00:00:00 2001 From: Khoi Le Date: Tue, 27 Aug 2024 15:09:53 -0400 Subject: [PATCH 1/3] Add a how_tos file to show basic examples of how to configure and settle of message using negative acknowledgement --- howtos/pubsub/how_to_consume_persistent_message.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/howtos/pubsub/how_to_consume_persistent_message.py b/howtos/pubsub/how_to_consume_persistent_message.py index bba118c..16141f8 100644 --- a/howtos/pubsub/how_to_consume_persistent_message.py +++ b/howtos/pubsub/how_to_consume_persistent_message.py @@ -5,7 +5,7 @@ from concurrent.futures.thread import ThreadPoolExecutor from typing import TypeVar -from compose.const import DEFAULT_TIMEOUT +# from compose.const import DEFAULT_TIMEOUT from solace.messaging.messaging_service import MessagingService from solace.messaging.config.solace_properties import service_properties @@ -197,7 +197,7 @@ def consume_full_message_and_do_ack_using_receive_message(messaging_service, top with ThreadPoolExecutor(max_workers=1) as e: time.sleep(2) e.submit(HowToPublishPersistentMessage.publish_string_message_non_blocking, publisher, topic, message) - message_received: 'InboundMessage' = receiver.receive_message(timeout=DEFAULT_TIMEOUT) + message_received: 'InboundMessage' = receiver.receive_message(timeout=1000) receiver.ack(message_received) print(f"received message: {message_received.get_payload_as_string()}") @@ -233,7 +233,7 @@ def consume_using_message_selector(messaging_service, topic, publisher, message) e.submit(HowToPublishPersistentMessage.publish_typed_message_with_extended_message_props_non_blocking, outbound_msg, publisher, topic, message, additional_message_properties) - message_received: 'InboundMessage' = receiver.receive_message(timeout=DEFAULT_TIMEOUT) + message_received: 'InboundMessage' = receiver.receive_message(timeout=1000) receiver.ack(message_received) print(f"received message: {message_received.get_payload_as_string()}") From dd9a30d0da96937d5d080588c91061e18bce5eb8 Mon Sep 17 00:00:00 2001 From: Khoi Le Date: Tue, 27 Aug 2024 15:22:38 -0400 Subject: [PATCH 2/3] Fixing commit on the consume_persistent_message file and add the correct how_tos file --- .../how_to_consume_persistent_message.py | 6 +- ...t_message_with_negative_acknowledgement.py | 108 ++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py diff --git a/howtos/pubsub/how_to_consume_persistent_message.py b/howtos/pubsub/how_to_consume_persistent_message.py index 16141f8..bba118c 100644 --- a/howtos/pubsub/how_to_consume_persistent_message.py +++ b/howtos/pubsub/how_to_consume_persistent_message.py @@ -5,7 +5,7 @@ from concurrent.futures.thread import ThreadPoolExecutor from typing import TypeVar -# from compose.const import DEFAULT_TIMEOUT +from compose.const import DEFAULT_TIMEOUT from solace.messaging.messaging_service import MessagingService from solace.messaging.config.solace_properties import service_properties @@ -197,7 +197,7 @@ def consume_full_message_and_do_ack_using_receive_message(messaging_service, top with ThreadPoolExecutor(max_workers=1) as e: time.sleep(2) e.submit(HowToPublishPersistentMessage.publish_string_message_non_blocking, publisher, topic, message) - message_received: 'InboundMessage' = receiver.receive_message(timeout=1000) + message_received: 'InboundMessage' = receiver.receive_message(timeout=DEFAULT_TIMEOUT) receiver.ack(message_received) print(f"received message: {message_received.get_payload_as_string()}") @@ -233,7 +233,7 @@ def consume_using_message_selector(messaging_service, topic, publisher, message) e.submit(HowToPublishPersistentMessage.publish_typed_message_with_extended_message_props_non_blocking, outbound_msg, publisher, topic, message, additional_message_properties) - message_received: 'InboundMessage' = receiver.receive_message(timeout=1000) + message_received: 'InboundMessage' = receiver.receive_message(timeout=DEFAULT_TIMEOUT) receiver.ack(message_received) print(f"received message: {message_received.get_payload_as_string()}") diff --git a/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py b/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py new file mode 100644 index 0000000..a03d927 --- /dev/null +++ b/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py @@ -0,0 +1,108 @@ +"""sampler to consume persistent message with negative acknowledgement""" + +import threading +from typing import TypeVar + +from solace.messaging.messaging_service import MessagingService +from solace.messaging.receiver.inbound_message import InboundMessage +from solace.messaging.receiver.persistent_message_receiver import PersistentMessageReceiver +from solace.messaging.resources.queue import Queue +from solace.messaging.resources.topic import Topic +from solace.messaging.resources.topic_subscription import TopicSubscription +from solace.messaging.config.message_acknowledgement_configuration import Outcome +from howtos.pubsub.how_to_consume_persistent_message import HowToConsumeMessageExclusiveVsSharedMode +from howtos.pubsub.how_to_publish_persistent_message import HowToPublishPersistentMessage +from howtos.sampler_boot import SolaceConstants, SamplerBoot, BasicTestMessageHandler +from howtos.sampler_master import SamplerMaster + +X = TypeVar('X') +constants = SolaceConstants +boot = SamplerBoot() +lock = threading.Lock() + +topic_name = constants.TOPIC_ENDPOINT_DEFAULT +topic = Topic.of(topic_name) + + +class HowToSettlePersistentMessageWithNegativeAcknowledgement: + """class contains methods to settle message with negative acknowledgement""" + + @staticmethod + def settle_message_sync(service: MessagingService, publisher, message, required_outcomes, outcome_to_settle): + try: + queue_name = constants.QUEUE_NAME_FORMAT.substitute(iteration=topic_name) + HowToConsumeMessageExclusiveVsSharedMode.create_queue_and_add_topic(queue_name) + queue_to_consume = Queue.durable_exclusive_queue(queue_name) + + receiver: PersistentMessageReceiver = service.create_persistent_message_receiver_builder() \ + .with_required_message_outcome_support(*required_outcomes).build(queue_to_consume) + receiver.start() + print(f'PERSISTENT receiver started for sync receiver... Listening to Queue [{queue_to_consume.get_name()}]') + receiver.add_subscription(TopicSubscription.of(topic_name)) + + HowToPublishPersistentMessage.publish_string_message_non_blocking(publisher, topic, message) + + message: InboundMessage = receiver.receive_message() + print(f"the message payload is {message.get_payload_as_string()}") + receiver.settle(message, outcome_to_settle) + finally: + receiver.terminate() + HowToConsumeMessageExclusiveVsSharedMode.delete_queue(queue_to_consume.get_name()) + + @staticmethod + def settle_message_async(service: MessagingService, publisher, message, required_outcomes, outcome_to_settle): + event = threading.Event() + receiver = None + + def receiver_callback(self, message: InboundMessage): + # Fail messages will be redelivered so we just want to settle the actual message that is not redelivered + if not message.is_redelivered(): + print(f"the message payload is {message.get_payload_as_string()}") + receiver.settle(message, outcome_to_settle) + event.set() + + try: + queue_name = constants.QUEUE_NAME_FORMAT.substitute(iteration=topic_name) + HowToConsumeMessageExclusiveVsSharedMode.create_queue_and_add_topic(queue_name) + queue_to_consume = Queue.durable_exclusive_queue(queue_name) + + receiver: PersistentMessageReceiver = service.create_persistent_message_receiver_builder() \ + .with_required_message_outcome_support(*required_outcomes).build(queue_to_consume) + receiver.start() + print(f'PERSISTENT receiver started for async receiver... Listening to Queue [{queue_to_consume.get_name()}]') + receiver.add_subscription(TopicSubscription.of(topic_name)) + message_handler = BasicTestMessageHandler(test_callback=receiver_callback) + receiver.receive_async(message_handler) + + HowToPublishPersistentMessage.publish_string_message_non_blocking(publisher, topic, message) + event.wait(5) + finally: + receiver.terminate() + HowToConsumeMessageExclusiveVsSharedMode.delete_queue(queue_to_consume.get_name()) + + @staticmethod + def run(): + try: + # Set up for required outcomes and outcome to settle + required_outcomes = (Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED) + outcome_to_settle = (Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED) + + message = constants.MESSAGE_TO_SEND + messaging_service = SamplerMaster.connect_messaging_service() + + publisher = HowToPublishPersistentMessage.create_persistent_message_publisher(messaging_service) + + for outcome in outcome_to_settle: + HowToSettlePersistentMessageWithNegativeAcknowledgement \ + .settle_message_sync(service=messaging_service, publisher=publisher, message=message, + required_outcomes=required_outcomes, outcome_to_settle=outcome) + + HowToSettlePersistentMessageWithNegativeAcknowledgement \ + .settle_message_async(service=messaging_service, publisher=publisher, message=message, + required_outcomes=required_outcomes, outcome_to_settle=outcome) + finally: + messaging_service.disconnect() + + +if __name__ == '__main__': + HowToSettlePersistentMessageWithNegativeAcknowledgement.run() From 374a20f78a392c407022bceab19af6ada970b6e8 Mon Sep 17 00:00:00 2001 From: Khoi Le Date: Wed, 28 Aug 2024 10:09:14 -0400 Subject: [PATCH 3/3] Add metrics to check the stats --- ...t_message_with_negative_acknowledgement.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py b/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py index a03d927..a014cc5 100644 --- a/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py +++ b/howtos/pubsub/how_to_consume_persistent_message_with_negative_acknowledgement.py @@ -10,6 +10,7 @@ from solace.messaging.resources.topic import Topic from solace.messaging.resources.topic_subscription import TopicSubscription from solace.messaging.config.message_acknowledgement_configuration import Outcome +from solace.messaging.utils.manageable import Metric from howtos.pubsub.how_to_consume_persistent_message import HowToConsumeMessageExclusiveVsSharedMode from howtos.pubsub.how_to_publish_persistent_message import HowToPublishPersistentMessage from howtos.sampler_boot import SolaceConstants, SamplerBoot, BasicTestMessageHandler @@ -45,6 +46,16 @@ def settle_message_sync(service: MessagingService, publisher, message, required_ message: InboundMessage = receiver.receive_message() print(f"the message payload is {message.get_payload_as_string()}") receiver.settle(message, outcome_to_settle) + + metrics = service.metrics() + message_settle_accepted = metrics.get_value(Metric.PERSISTENT_MESSSAGES_ACCEPTED) + message_settle_rejected = metrics.get_value(Metric.PERSISTENT_MESSSAGES_REJECTED) + message_settle_failed = metrics.get_value(Metric.PERSISTENT_MESSSAGES_FAILED) + + print(f"Message Settle Accepted: {message_settle_accepted}") + print(f"Message Settle Rejected: {message_settle_rejected}") + print(f"Message Settle Failed: {message_settle_failed}") + finally: receiver.terminate() HowToConsumeMessageExclusiveVsSharedMode.delete_queue(queue_to_consume.get_name()) @@ -76,6 +87,16 @@ def receiver_callback(self, message: InboundMessage): HowToPublishPersistentMessage.publish_string_message_non_blocking(publisher, topic, message) event.wait(5) + + metrics = service.metrics() + message_settle_accepted = metrics.get_value(Metric.PERSISTENT_MESSSAGES_ACCEPTED) + message_settle_rejected = metrics.get_value(Metric.PERSISTENT_MESSSAGES_REJECTED) + message_settle_failed = metrics.get_value(Metric.PERSISTENT_MESSSAGES_FAILED) + + print(f"Message Settle Accepted: {message_settle_accepted}") + print(f"Message Settle Rejected: {message_settle_rejected}") + print(f"Message Settle Failed: {message_settle_failed}") + finally: receiver.terminate() HowToConsumeMessageExclusiveVsSharedMode.delete_queue(queue_to_consume.get_name()) @@ -92,6 +113,8 @@ def run(): publisher = HowToPublishPersistentMessage.create_persistent_message_publisher(messaging_service) + # We will settle the message 2 ways, sync and async, for each Outcome + # Therefore, at the end, we will see each message get settle 2 times for outcome in outcome_to_settle: HowToSettlePersistentMessageWithNegativeAcknowledgement \ .settle_message_sync(service=messaging_service, publisher=publisher, message=message,