From fb59eb2c1b54841ebea4b07dc518d306c1741b06 Mon Sep 17 00:00:00 2001 From: Gautam A K Date: Sun, 13 Oct 2024 21:44:47 +0530 Subject: [PATCH 1/4] added threading to consumer --- aws_sqs_consumer/consumer.py | 27 ++++++++++++++++++++++++++- pyproject.toml | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/aws_sqs_consumer/consumer.py b/aws_sqs_consumer/consumer.py index 28f1244..4f44883 100644 --- a/aws_sqs_consumer/consumer.py +++ b/aws_sqs_consumer/consumer.py @@ -6,6 +6,9 @@ import boto3 import time import traceback +import threading +import atexit +from uuid import uuid4 from typing import List from .error import SQSException @@ -27,7 +30,10 @@ def __init__( batch_size=1, wait_time_seconds=1, visibility_timeout_seconds=None, - polling_wait_time_ms=0 + polling_wait_time_ms=0, + daemon: bool=True, + thread_name: str="consumer", + threaded: bool=True ): self.queue_url = queue_url self.attribute_names = attribute_names @@ -41,6 +47,10 @@ def __init__( self.wait_time_seconds = wait_time_seconds self.visibility_timeout_seconds = visibility_timeout_seconds self.polling_wait_time_ms = polling_wait_time_ms + self.daemon = daemon + self.thread_name_prefix = "aws_sqs_thread" + thread_name + self._sqs_thread = None + self.threaded = threaded if region: self._sqs_client = sqs_client or boto3.client( "sqs", region_name=region) @@ -52,6 +62,7 @@ def __init__( raise Exception("Please specify the region parameter or set \ AWS_DEFAULT_REGION env variable.") self._running = False + atexit.register(self.stop) def handle_message(self, message: Message): """ @@ -134,6 +145,18 @@ def stop(self): # TODO: There's no way to invoke this other than a separate thread. self._running = False + def start_consumer(self): + """ + Starts the process of receiving sqs messages either in main thread (if threaded=False) or + separate thread (if threaded=True) depending on threaded + """ + if not self.threaded: + self.start() + else: + thread_name = self.thread_name_prefix + str(uuid4()) + self._sqs_thread = threading.Thread(target=self.start, name=thread_name, daemon=self.daemon) + self._sqs_thread.start() + def _process_message(self, message: Message): try: self.handle_message(message) @@ -192,3 +215,5 @@ def _sqs_client_params(self): def _polling_wait(self): time.sleep(self.polling_wait_time_ms / 1000) + + diff --git a/pyproject.toml b/pyproject.toml index e73098c..81a2624 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws_sqs_consumer" -version = "0.0.15" +version = "0.0.16" description = "AWS SQS Consumer" authors = ["Hexmos Technology "] license = "MIT" From 03e76ce54b3fec49fe9699db9159bf5c0f0fdece Mon Sep 17 00:00:00 2001 From: Gautam A K Date: Sun, 13 Oct 2024 22:03:38 +0530 Subject: [PATCH 2/4] Changes for alignment with PEP-8 standards --- aws_sqs_consumer/consumer.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/aws_sqs_consumer/consumer.py b/aws_sqs_consumer/consumer.py index 4f44883..db9601d 100644 --- a/aws_sqs_consumer/consumer.py +++ b/aws_sqs_consumer/consumer.py @@ -31,9 +31,9 @@ def __init__( wait_time_seconds=1, visibility_timeout_seconds=None, polling_wait_time_ms=0, - daemon: bool=True, - thread_name: str="consumer", - threaded: bool=True + daemon: bool = True, + thread_name: str = "consumer", + threaded: bool = True ): self.queue_url = queue_url self.attribute_names = attribute_names @@ -147,14 +147,17 @@ def stop(self): def start_consumer(self): """ - Starts the process of receiving sqs messages either in main thread (if threaded=False) or - separate thread (if threaded=True) depending on threaded + Starts the process of receiving sqs messages either in main + thread (if threaded=False) or separate thread (if threaded=True) + depending on threaded. """ if not self.threaded: self.start() else: thread_name = self.thread_name_prefix + str(uuid4()) - self._sqs_thread = threading.Thread(target=self.start, name=thread_name, daemon=self.daemon) + self._sqs_thread = threading.Thread(target=self.start, + name=thread_name, + daemon=self.daemon) self._sqs_thread.start() def _process_message(self, message: Message): @@ -215,5 +218,3 @@ def _sqs_client_params(self): def _polling_wait(self): time.sleep(self.polling_wait_time_ms / 1000) - - From 41d2ff549953aec526b4a6aeb9b42f49884a2bfa Mon Sep 17 00:00:00 2001 From: Gautam A K Date: Sun, 13 Oct 2024 22:05:39 +0530 Subject: [PATCH 3/4] minor changes --- aws_sqs_consumer/consumer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/aws_sqs_consumer/consumer.py b/aws_sqs_consumer/consumer.py index db9601d..4b2218b 100644 --- a/aws_sqs_consumer/consumer.py +++ b/aws_sqs_consumer/consumer.py @@ -118,7 +118,6 @@ def start(self): """ Start the consumer. """ - # TODO: Figure out threading/daemon self._running = True while self._running: response = self._sqs_client.receive_message( @@ -142,7 +141,6 @@ def stop(self): """ Stop the consumer. """ - # TODO: There's no way to invoke this other than a separate thread. self._running = False def start_consumer(self): From 65dbb245876a43c6c1f696ab1e7b61c856931002 Mon Sep 17 00:00:00 2001 From: Gautam A K Date: Sun, 13 Oct 2024 22:45:04 +0530 Subject: [PATCH 4/4] changes to join daemon thread at exit --- aws_sqs_consumer/consumer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws_sqs_consumer/consumer.py b/aws_sqs_consumer/consumer.py index 4b2218b..6e22079 100644 --- a/aws_sqs_consumer/consumer.py +++ b/aws_sqs_consumer/consumer.py @@ -142,6 +142,8 @@ def stop(self): Stop the consumer. """ self._running = False + if not self.daemon: + self._sqs_thread.join() def start_consumer(self): """