Skip to content

Commit

Permalink
fix: batch processing exceptions (#276)
Browse files Browse the repository at this point in the history
* fix: Add tracebacks for all caught exceptions to SQSBatchProcessingError output

* chore: Remove debugging code

* chore: fix unit test to account for change in exception passing

* chore: use kwargs in method calls, add comment to SQSBatchProcessingError

* chore: Fix tests after changing method calls to use kwargs
  • Loading branch information
Tom McCarthy authored Feb 2, 2021
1 parent 575a103 commit be6aa08
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
7 changes: 4 additions & 3 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def success_handler(self, record: Any, result: Any):
self.success_messages.append(record)
return entry

def failure_handler(self, record: Any, exception: Exception):
def failure_handler(self, record: Any, exception: Tuple):
"""
Failure callback
Expand All @@ -94,8 +94,9 @@ def failure_handler(self, record: Any, exception: Exception):
tuple
"fail", exceptions args, original record
"""
entry = ("fail", exception.args, record)
logger.debug(f"Record processing exception: {exception}")
exception_string = f"{exception[0]}:{exception[1]}"
entry = ("fail", exception_string, record)
logger.debug(f"Record processing exception: {exception_string}")
self.exceptions.append(exception)
self.fail_messages.append(record)
return entry
Expand Down
18 changes: 18 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
"""
Batch processing exceptions
"""
import traceback


class SQSBatchProcessingError(Exception):
"""When at least one message within a batch could not be processed"""

def __init__(self, msg="", child_exceptions=()):
super().__init__(msg)
self.msg = msg
self.child_exceptions = child_exceptions

# Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent
# errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275
def __str__(self):
parent_exception_str = super(SQSBatchProcessingError, self).__str__()
exception_list = [f"{parent_exception_str}\n"]
for exception in self.child_exceptions:
extype, ex, tb = exception
formatted = "".join(traceback.format_exception(extype, ex, tb))
exception_list.append(formatted)

return "\n".join(exception_list)
15 changes: 10 additions & 5 deletions aws_lambda_powertools/utilities/batch/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Batch SQS utilities
"""
import logging
import sys
from typing import Callable, Dict, List, Optional, Tuple

import boto3
Expand Down Expand Up @@ -90,10 +91,10 @@ def _process_record(self, record) -> Tuple:
An object to be processed.
"""
try:
result = self.handler(record)
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)
result = self.handler(record=record)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=record, exception=sys.exc_info())

def _prepare(self):
"""
Expand Down Expand Up @@ -123,7 +124,11 @@ def _clean(self):
logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed")
else:
logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception")
raise SQSBatchProcessingError(list(self.exceptions))
raise SQSBatchProcessingError(
msg=f"Not all records processed succesfully. {len(self.exceptions)} individual errors logged "
f"separately below.",
child_exceptions=self.exceptions,
)

return delete_message_response

Expand Down
10 changes: 5 additions & 5 deletions tests/functional/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha
with partial_processor(records, record_handler) as ctx:
ctx.process()

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
stubber.assert_no_pending_responses()


Expand Down Expand Up @@ -144,7 +144,7 @@ def lambda_handler(event, context):
with pytest.raises(SQSBatchProcessingError) as error:
lambda_handler(event, {})

assert len(error.value.args[0]) == 2
assert len(error.value.child_exceptions) == 2
stubber.assert_no_pending_responses()


Expand All @@ -171,7 +171,7 @@ def lambda_handler(event, context):
with pytest.raises(SQSBatchProcessingError) as error:
lambda_handler(event, {})

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
stubber.assert_no_pending_responses()


Expand Down Expand Up @@ -203,7 +203,7 @@ def lambda_handler(event, context):

stubber.assert_no_pending_responses()

assert len(error.value.args[0]) == 1
assert len(error.value.child_exceptions) == 1
assert capsys.readouterr().out == "Oh no ! It's a failure.\n"


Expand Down Expand Up @@ -289,4 +289,4 @@ def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_ha
with partial_processor(records, record_handler) as ctx:
ctx.process()

assert len(error.value.args[0]) == 2
assert len(error.value.child_exceptions) == 2
12 changes: 8 additions & 4 deletions tests/unit/test_utilities_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def test_partial_sqs_process_record_success(mocker, partial_sqs_processor):

result = partial_sqs_processor._process_record(record)

handler_mock.assert_called_once_with(record)
success_handler_mock.assert_called_once_with(record, success_result)
handler_mock.assert_called_once_with(record=record)
success_handler_mock.assert_called_once_with(record=record, result=success_result)

assert result == expected_value

Expand All @@ -98,9 +98,13 @@ def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor):

result = partial_sqs_processor._process_record(record)

handler_mock.assert_called_once_with(record)
failure_handler_mock.assert_called_once_with(record, failure_result)
handler_mock.assert_called_once_with(record=record)

_, failure_handler_called_with_args = failure_handler_mock.call_args
failure_handler_mock.assert_called_once()
assert (failure_handler_called_with_args["record"]) == record
assert isinstance(failure_handler_called_with_args["exception"], tuple)
assert failure_handler_called_with_args["exception"][1] == failure_result
assert result == expected_value


Expand Down

0 comments on commit be6aa08

Please sign in to comment.