-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast revamp rebase #1769
Broadcast revamp rebase #1769
Conversation
WalkthroughThis pull request introduces comprehensive asynchronous enhancements to WhatsApp messaging and broadcasting functionality across multiple files. The changes focus on adding async methods for sending messages, implementing retry mechanisms, and improving error handling in WhatsApp communication channels. New configuration parameters for broadcast rate and batch size are added, and corresponding unit tests are created to validate the new asynchronous methods and their error handling capabilities. Changes
Sequence DiagramsequenceDiagram
participant Client
participant WhatsappCloud
participant WhatsappAPI
Client->>WhatsappCloud: send_async(payload)
WhatsappCloud->>WhatsappAPI: Send message
alt Successful Send
WhatsappAPI-->>WhatsappCloud: Success Response
WhatsappCloud-->>Client: (True, status_code, response)
else Send Failure
WhatsappAPI-->>WhatsappCloud: Error Response
WhatsappCloud-->>Client: (False, status_code, error)
end
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (13)
kairon/chat/handlers/channels/clients/whatsapp/dialog360.py (1)
48-83
: Consider broadening the success condition and explicit exception handling.
- The success condition currently checks for status code
200
only. You might consider verifying if other 2xx status codes should also be deemed successful (e.g., 201 or 202).- A potential improvement is catching
asyncio.TimeoutError
explicitly for better debugging and user feedback.- if response.status == 200: + if 200 <= response.status < 300: resp = await response.json() return True, response.status, respkairon/chat/handlers/channels/clients/whatsapp/cloud.py (2)
101-130
: Asynchronous message sending aligned with synchronous counterpart.The logic to validate
messaging_type
and construct the request body is parallel to the synchronous method. This is a good approach to maintain consistency.Consider adding extra debug logs when constructing the request body to aid troubleshooting.
222-261
: Flexible retry with clear error handling.
- Using
aiohttp_retry
with exponential backoff is ideal for transient failures.- Similar to the “dialog360.py” suggestion, consider expanding the success range to all 2xx codes.
- Explicitly catching
asyncio.TimeoutError
might further refine error handling.kairon/shared/channels/broadcast/whatsapp.py (3)
77-97
: Retry logic is clearly separated.The dedicated async retry method cleanly logs results under the “resend” event type. This separation of concerns is maintainable.
Consider clarifying whether the retry method imposes a delay or backoff; if not, a short delay may help with rate limits.
98-118
: Consistent logging for failed messages.The function centralizes logging for quick triaging and references the broadcast log type.
Consider using a named tuple or small data class instead of positional unpacking to improve clarity when reading and maintaining these parameters.
Line range hint
170-342
: Potential concurrency concerns with repeated asyncio.run calls.
- Repeatedly calling
asyncio.run(...)
in a loop can block the main thread for each chunk.- Consider creating an async wrapper around
initiate_broadcast
and executing the entire process under a singleasyncio.run(...)
to improve performance and simplicity.tests/unit_test/events/events_test.py (2)
Line range hint
1607-1618
: Verify repeated 'template_params' references
Both lines add'template_params'
with similar definitions. Double-check there is no unnecessary duplication that might cause confusion or overshadow data. Otherwise, no functional concerns.
2073-2087
: Streamline repetitive log verification
This block ensures that logs contain'Completed'
status,'config'
fields, and references to the bot. If this pattern repeats across tests, consider a helper method or utility to reduce duplication.system.yaml (1)
265-267
: LGTM! Consider adding documentation for the new broadcast parameters.The new broadcast configuration section with rate limiting and batch size parameters is well-structured. The default values (20 messages/second, batch size of 10) seem reasonable for production use.
Consider adding documentation comments above the broadcast section to explain:
- The purpose and impact of these parameters
- Recommended values for different scales of operation
- Any limitations or constraints to be aware of
tests/testing_data/system.yaml (1)
265-265
: Add newline at end of file.Add a newline character at the end of the file to comply with YAML best practices.
whatsapp_broadcast_batch_size: ${WHATSAPP_BROADCAST_BATCH_SIZE:10} +
🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 265-265: no new line character at the end of file
(new-line-at-end-of-file)
tests/unit_test/channels/whatsapp_broadcast_test.py (3)
1-15
: Remove unused import.The
asyncio
import is not used directly in the code.-import asyncio import os from unittest.mock import patch, MagicMock
🧰 Tools
🪛 Ruff (0.8.2)
1-1:
asyncio
imported but unusedRemove unused import:
asyncio
(F401)
445-447
: Remove unused mock variables.The
mock_get_client
andmock_channel_client
variables are assigned but never used in the test cases.- with patch.object(WhatsappBroadcast, '_WhatsappBroadcast__get_client', return_value=MagicMock()) as mock_get_client, \ - patch.object(whatsapp_broadcast, 'channel_client', new_callable=MagicMock) as mock_channel_client, \ + with patch.object(whatsapp_broadcast, 'channel_client', new_callable=MagicMock), \Also applies to: 489-491
🧰 Tools
🪛 Ruff (0.8.2)
445-445: Local variable
mock_get_client
is assigned to but never usedRemove assignment to unused variable
mock_get_client
(F841)
446-446: Local variable
mock_channel_client
is assigned to but never usedRemove assignment to unused variable
mock_channel_client
(F841)
109-109
: Remove debug print statement.The
print(url)
statement should be removed as it's not needed for testing.- print(url)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
kairon/chat/handlers/channels/clients/whatsapp/cloud.py
(5 hunks)kairon/chat/handlers/channels/clients/whatsapp/dialog360.py
(2 hunks)kairon/shared/channels/broadcast/from_config.py
(1 hunks)kairon/shared/channels/broadcast/whatsapp.py
(8 hunks)system.yaml
(1 hunks)tests/testing_data/system.yaml
(1 hunks)tests/unit_test/action/kremote_action_test.py
(0 hunks)tests/unit_test/channels/mail_channel_test.py
(1 hunks)tests/unit_test/channels/whatsapp_broadcast_test.py
(1 hunks)tests/unit_test/events/events_test.py
(54 hunks)
💤 Files with no reviewable changes (1)
- tests/unit_test/action/kremote_action_test.py
✅ Files skipped from review due to trivial changes (1)
- tests/unit_test/channels/mail_channel_test.py
🧰 Additional context used
🪛 YAMLlint (1.35.1)
tests/testing_data/system.yaml
[error] 265-265: no new line character at the end of file
(new-line-at-end-of-file)
🪛 Ruff (0.8.2)
tests/unit_test/channels/whatsapp_broadcast_test.py
1-1: asyncio
imported but unused
Remove unused import: asyncio
(F401)
19-19: Use capitalized environment variable SYSTEM_FILE
instead of system_file
Replace system_file
with SYSTEM_FILE
(SIM112)
445-445: Local variable mock_get_client
is assigned to but never used
Remove assignment to unused variable mock_get_client
(F841)
446-446: Local variable mock_channel_client
is assigned to but never used
Remove assignment to unused variable mock_channel_client
(F841)
489-489: Local variable mock_get_client
is assigned to but never used
Remove assignment to unused variable mock_get_client
(F841)
490-490: Local variable mock_channel_client
is assigned to but never used
Remove assignment to unused variable mock_channel_client
(F841)
kairon/shared/channels/broadcast/whatsapp.py
139-139: Function definition does not bind loop variable chunk
(B023)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: Python CI
- GitHub Check: Analyze (python)
🔇 Additional comments (37)
kairon/chat/handlers/channels/clients/whatsapp/dialog360.py (1)
2-3
: Imports look good.These libraries are necessary for async HTTP requests and retry mechanisms. Ensure the
aiohttp_retry
package is listed in your dependencies for deployment.Also applies to: 6-6
kairon/chat/handlers/channels/clients/whatsapp/cloud.py (2)
5-5
: Constants and imports are properly placed.The addition of
urlencode
,ClientError
classes, and theINVALID_STATUS_CODES
constant is neatly organized. DefiningWHATSAPP_REQUEST_TIMEOUT
also enhances clarity.Also applies to: 8-9, 18-18, 35-36
210-221
: Async template message sending is consistent with existing design.The method is straightforward and correctly delegates to
send_async()
. Validation of the message structure is consistent with the synchronous method.kairon/shared/channels/broadcast/whatsapp.py (3)
1-3
: Async and partial imports are valid.These imports facilitate asynchronous broadcasting and partial function application.
23-23
: Chunked iteration is an efficient approach.Importing
chunked
helps process broadcasts in manageable batches, improving readability.
54-76
: Template message method looks good.
- Awaiting
send_template_message_async()
ensures correct async workflow.- Logging with
MessageBroadcastProcessor.add_event_log
effectively captures result status.tests/unit_test/events/events_test.py (25)
1259-1259
: Confirm patch placement for async method
The patch decorator forsend_template_message_async
appears correctly placed. No issues found.
1324-1324
: Validate mock’s triple-return format
Mocking the triple return tuple(success_bool, http_status_code, json_response)
aligns with the updated asynchronous method signature. Looks good.
1396-1396
: Patch usage for async handler
The patch onsend_template_message_async
is consistent with the new asynchronous approach. No concerns here.
1460-1460
: Check consistency of return structure in mock
The return value includes the triple(True, 200, {...})
, matching the new function contract. No issues.
1500-1500
: Auto-spec patch for send_template_message_async
Usingautospec=True
helps ensure the signature is validated. This is a good practice.
1571-1571
: Triple-return mock setup
Again, the mock’s three-part return structure reflects the newly introduced contract. No problems found.
1636-1636
: Confirm patch for async message method
Everything looks consistent with the new asynchronous code. No issues noted.
1735-1735
: Additional patch on send_template_message_async
Another valid patch usage for the async method. No concerns found.
1764-1764
: Mocking the triple-return structure
Returning(True, 200, {...})
is in line with the expected response format. No issues.
1789-1789
: Ensure removal of 'template_params' is intentional
The code explicitly removes'template_params'
from logs. Verify that discarding these parameters is intended and does not impact debugging or analytics.
1797-1797
: Patch usage correctness
No issues with the new async patch decorator. Looks good.
1859-1859
: Return value consistency in mock
This mock’s triple structure again aligns with the updated method signature. No concerns.
1932-1932
: Adding 'errors' key to response logs
Introducing'errors': [...]
for capturing broadcast errors is a sound design choice. Appears consistent with the rest of the code.
1963-1963
: Auto-spec patch remains valid
The patch usage looks standard and correct. Approved.
2062-2062
: Mocking triple return once more
Maintaining consistent structure(bool, status_code, data)
across tests is good. No problems noted.
2108-2108
: Repeated log fetch
The direct call to retrieve logs is clear. No issues noticed.
2143-2143
: Log retrieval remains consistent
Another fetch of broadcast logs. No concerns.
2183-2183
: Additional broadcast log retrieval
Still straightforward. No issues to report.
2193-2193
: Error handling for missing template
Capturing'template_exception': 'Failed to load the template'
is a solid approach for debugging. No issues.
2257-2257
: Patch usage for send_template_message_async
Autospec patch usage validated. No concerns found.
2621-2621
: New test function for static resend broadcast
Adding coverage for a resend scenario is good. Implementation looks fine.
Line range hint
3079-3079
: Dynamic resend broadcast test function
Tests dynamic broadcast flows thoroughly. Good additions.
3701-3701
: Meta error code skip test
Ensures broadcasts skip certain error codes. The coverage is valuable. Approved.
4141-4141
: Multiple resend broadcast attempts test
Covers repeated resend logic. Implementation is solid.
4700-4700
: Broadcast chat history logging test
Ensures that chat history is correctly recorded for broadcast messages. No issues.tests/testing_data/system.yaml (1)
263-265
: LGTM! Test configuration aligns with production.The broadcast configuration in the test environment uses appropriate values for testing scenarios.
🧰 Tools
🪛 YAMLlint (1.35.1)
[error] 265-265: no new line character at the end of file
(new-line-at-end-of-file)
kairon/shared/channels/broadcast/from_config.py (1)
25-25
: LGTM! Channel client initialization added.The addition of
self.channel_client
is well-placed for managing asynchronous channel interactions.tests/unit_test/channels/whatsapp_broadcast_test.py (4)
17-25
: LGTM! Test environment setup is comprehensive.The fixture properly sets up the test environment with necessary configurations.
🧰 Tools
🪛 Ruff (0.8.2)
19-19: Use capitalized environment variable
SYSTEM_FILE
instead ofsystem_file
Replace
system_file
withSYSTEM_FILE
(SIM112)
27-39
: LGTM! Thorough testing of WhatsApp Cloud API success scenarios.The tests effectively verify both immediate success and retry success scenarios for the WhatsApp Cloud API.
Also applies to: 41-54
342-379
: LGTM! Comprehensive broadcast initiation testing.The test effectively verifies:
- Batch processing with configured size
- Rate limiting
- Success and failure scenarios
- Proper mocking of dependencies
507-534
: LGTM! Thorough error logging test.The test properly verifies that failed messages are logged with appropriate error details.
def initiate_broadcast(self, message_list: list, is_resend: bool=False): | ||
batch_size = Utility.environment["broadcast"]["whatsapp_broadcast_batch_size"] | ||
rate_per_second = Utility.environment["broadcast"]["whatsapp_broadcast_rate_per_second"] | ||
max_batches_per_second = rate_per_second // batch_size | ||
|
||
last_time = datetime.utcnow() | ||
batches_sent_in_current_second = 0 | ||
non_sent_recipients = [] | ||
sent_count = 0 | ||
done_till = 0 | ||
|
||
for chunk in chunked(message_list, batch_size): | ||
tasks = [] | ||
if is_resend: | ||
tasks = [self.send_template_message_retry(*msg) for msg in chunk] | ||
else: | ||
tasks = [self.send_template_message(*msg) for msg in chunk] | ||
async def run_async(tasks_to_run): | ||
failed_to_send_recipients = [] | ||
results = await asyncio.gather(*tasks_to_run) | ||
fails = [not result[0] for result in results] | ||
for msg, fail in zip(chunk, fails): | ||
if fail: | ||
failed_to_send_recipients.append(msg[1]) | ||
|
||
return all(fails) , failed_to_send_recipients | ||
|
||
all_failed, failed_recipients = asyncio.run(run_async(tasks)) | ||
done_till += len(tasks) | ||
|
||
if all_failed: | ||
break | ||
non_sent_recipients.extend(failed_recipients) | ||
sent_count += len(tasks) - len(failed_recipients) | ||
|
||
batches_sent_in_current_second += 1 | ||
|
||
if batches_sent_in_current_second >= max_batches_per_second: | ||
current_time = datetime.utcnow() | ||
time_diff = (current_time - last_time).total_seconds() | ||
if time_diff < 1: | ||
asyncio.run(asyncio.sleep(1 - time_diff)) | ||
last_time = datetime.utcnow() | ||
batches_sent_in_current_second = 0 | ||
|
||
failed_broadcast_log_type = MessageBroadcastLogType.resend.value if is_resend else MessageBroadcastLogType.send.value | ||
|
||
if done_till < len(message_list): | ||
self.log_failed_messages(message_list[done_till:], broadcast_log_type=failed_broadcast_log_type) | ||
|
||
return sent_count, non_sent_recipients | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid referencing loop variables in nested functions to comply with best practices.
At line 139, the static analysis hint (B023) is correct: referencing chunk
inside run_async
can cause subtle scoping issues.
Below is a proposed fix that explicitly passes chunk
:
for chunk in chunked(message_list, batch_size):
...
async def run_async(tasks_to_run, current_chunk):
results = await asyncio.gather(*tasks_to_run)
fails = [not result[0] for result in results]
- for msg, fail in zip(chunk, fails):
+ for msg, fail in zip(current_chunk, fails):
...
- all_failed, failed_recipients = asyncio.run(run_async(tasks))
+ all_failed, failed_recipients = asyncio.run(run_async(tasks, chunk))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def initiate_broadcast(self, message_list: list, is_resend: bool=False): | |
batch_size = Utility.environment["broadcast"]["whatsapp_broadcast_batch_size"] | |
rate_per_second = Utility.environment["broadcast"]["whatsapp_broadcast_rate_per_second"] | |
max_batches_per_second = rate_per_second // batch_size | |
last_time = datetime.utcnow() | |
batches_sent_in_current_second = 0 | |
non_sent_recipients = [] | |
sent_count = 0 | |
done_till = 0 | |
for chunk in chunked(message_list, batch_size): | |
tasks = [] | |
if is_resend: | |
tasks = [self.send_template_message_retry(*msg) for msg in chunk] | |
else: | |
tasks = [self.send_template_message(*msg) for msg in chunk] | |
async def run_async(tasks_to_run): | |
failed_to_send_recipients = [] | |
results = await asyncio.gather(*tasks_to_run) | |
fails = [not result[0] for result in results] | |
for msg, fail in zip(chunk, fails): | |
if fail: | |
failed_to_send_recipients.append(msg[1]) | |
return all(fails) , failed_to_send_recipients | |
all_failed, failed_recipients = asyncio.run(run_async(tasks)) | |
done_till += len(tasks) | |
if all_failed: | |
break | |
non_sent_recipients.extend(failed_recipients) | |
sent_count += len(tasks) - len(failed_recipients) | |
batches_sent_in_current_second += 1 | |
if batches_sent_in_current_second >= max_batches_per_second: | |
current_time = datetime.utcnow() | |
time_diff = (current_time - last_time).total_seconds() | |
if time_diff < 1: | |
asyncio.run(asyncio.sleep(1 - time_diff)) | |
last_time = datetime.utcnow() | |
batches_sent_in_current_second = 0 | |
failed_broadcast_log_type = MessageBroadcastLogType.resend.value if is_resend else MessageBroadcastLogType.send.value | |
if done_till < len(message_list): | |
self.log_failed_messages(message_list[done_till:], broadcast_log_type=failed_broadcast_log_type) | |
return sent_count, non_sent_recipients | |
def initiate_broadcast(self, message_list: list, is_resend: bool=False): | |
batch_size = Utility.environment["broadcast"]["whatsapp_broadcast_batch_size"] | |
rate_per_second = Utility.environment["broadcast"]["whatsapp_broadcast_rate_per_second"] | |
max_batches_per_second = rate_per_second // batch_size | |
last_time = datetime.utcnow() | |
batches_sent_in_current_second = 0 | |
non_sent_recipients = [] | |
sent_count = 0 | |
done_till = 0 | |
for chunk in chunked(message_list, batch_size): | |
tasks = [] | |
if is_resend: | |
tasks = [self.send_template_message_retry(*msg) for msg in chunk] | |
else: | |
tasks = [self.send_template_message(*msg) for msg in chunk] | |
async def run_async(tasks_to_run, current_chunk): | |
failed_to_send_recipients = [] | |
results = await asyncio.gather(*tasks_to_run) | |
fails = [not result[0] for result in results] | |
for msg, fail in zip(current_chunk, fails): | |
if fail: | |
failed_to_send_recipients.append(msg[1]) | |
return all(fails) , failed_to_send_recipients | |
all_failed, failed_recipients = asyncio.run(run_async(tasks, chunk)) | |
done_till += len(tasks) | |
if all_failed: | |
break | |
non_sent_recipients.extend(failed_recipients) | |
sent_count += len(tasks) - len(failed_recipients) | |
batches_sent_in_current_second += 1 | |
if batches_sent_in_current_second >= max_batches_per_second: | |
current_time = datetime.utcnow() | |
time_diff = (current_time - last_time).total_seconds() | |
if time_diff < 1: | |
asyncio.run(asyncio.sleep(1 - time_diff)) | |
last_time = datetime.utcnow() | |
batches_sent_in_current_second = 0 | |
failed_broadcast_log_type = MessageBroadcastLogType.resend.value if is_resend else MessageBroadcastLogType.send.value | |
if done_till < len(message_list): | |
self.log_failed_messages(message_list[done_till:], broadcast_log_type=failed_broadcast_log_type) | |
return sent_count, non_sent_recipients |
🧰 Tools
🪛 Ruff (0.8.2)
139-139: Function definition does not bind loop variable chunk
(B023)
Summary by CodeRabbit
Release Notes
New Features
Performance Improvements
Configuration Updates
Testing