Skip to content

Added threading feature. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions aws_sqs_consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import boto3
import time
import traceback
import threading
import atexit
from uuid import uuid4
from typing import List

from .error import SQSException
Expand All @@ -27,7 +30,10 @@ def __init__(
batch_size=1,
wait_time_seconds=1,
visibility_timeout_seconds=None,
polling_wait_time_ms=0
polling_wait_time_ms=0,
daemon: bool = True,
thread_name: str = "consumer",
threaded: bool = True
):
self.queue_url = queue_url
self.attribute_names = attribute_names
Expand All @@ -41,6 +47,10 @@ def __init__(
self.wait_time_seconds = wait_time_seconds
self.visibility_timeout_seconds = visibility_timeout_seconds
self.polling_wait_time_ms = polling_wait_time_ms
self.daemon = daemon
self.thread_name_prefix = "aws_sqs_thread" + thread_name
self._sqs_thread = None
self.threaded = threaded
if region:
self._sqs_client = sqs_client or boto3.client(
"sqs", region_name=region)
Expand All @@ -52,6 +62,7 @@ def __init__(
raise Exception("Please specify the region parameter or set \
AWS_DEFAULT_REGION env variable.")
self._running = False
atexit.register(self.stop)

def handle_message(self, message: Message):
"""
Expand Down Expand Up @@ -107,7 +118,6 @@ def start(self):
"""
Start the consumer.
"""
# TODO: Figure out threading/daemon
self._running = True
while self._running:
response = self._sqs_client.receive_message(
Expand All @@ -131,8 +141,24 @@ def stop(self):
"""
Stop the consumer.
"""
# TODO: There's no way to invoke this other than a separate thread.
self._running = False
if not self.daemon:
self._sqs_thread.join()

def start_consumer(self):
"""
Starts the process of receiving sqs messages either in main
thread (if threaded=False) or separate thread (if threaded=True)
depending on threaded.
"""
if not self.threaded:
self.start()
else:
thread_name = self.thread_name_prefix + str(uuid4())
self._sqs_thread = threading.Thread(target=self.start,
name=thread_name,
daemon=self.daemon)
self._sqs_thread.start()

def _process_message(self, message: Message):
try:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws_sqs_consumer"
version = "0.0.15"
version = "0.0.16"
description = "AWS SQS Consumer"
authors = ["Hexmos Technology <[email protected]>"]
license = "MIT"
Expand Down