From 75ed830ce2d17b901f9c8dd3f036ca11237382a7 Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Sun, 16 May 2021 00:59:16 +0200 Subject: [PATCH] fix integration test assertions to accommodate recent CloudWatch metrics changes --- tests/integration/test_integration.py | 45 +++++++++++++-------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index be2a6647a7840a..3a53a4c90c2b5a 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -21,7 +21,6 @@ TEST_LAMBDA_SOURCE_STREAM_NAME = 'test_source_stream' TEST_TABLE_NAME = 'test_stream_table' TEST_LAMBDA_NAME_DDB = 'test_lambda_ddb' -TEST_LAMBDA_NAME_STREAM = 'test_lambda_stream' TEST_LAMBDA_NAME_QUEUE = 'test_lambda_queue' TEST_FIREHOSE_NAME = 'test_firehose' TEST_BUCKET_NAME = lambda_integration.TEST_BUCKET_NAME @@ -157,6 +156,7 @@ def create_kinesis_stream(name, delete=False): ddb_lease_table_suffix = '-kclapp' table_name = TEST_TABLE_NAME + 'klsdss' + ddb_lease_table_suffix stream_name = TEST_STREAM_NAME + lambda_stream_name = 'lambda-stream-%s' % short_uid() dynamodb = aws_stack.connect_to_resource('dynamodb') dynamodb_service = aws_stack.connect_to_service('dynamodb') dynamodbstreams = aws_stack.connect_to_service('dynamodbstreams') @@ -207,7 +207,7 @@ def process_records(records, shard_id): # deploy test lambda connected to Kinesis Stream kinesis_event_source_arn = kinesis.describe_stream( StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME)['StreamDescription']['StreamARN'] - testutil.create_lambda_function(func_name=TEST_LAMBDA_NAME_STREAM, + testutil.create_lambda_function(func_name=lambda_stream_name, zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27) # deploy test lambda connected to SQS queue @@ -252,27 +252,30 @@ def process_records(records, shard_id): }}) # put items to stream - num_events_kinesis = 10 - LOGGER.info('Putting %s items to stream...' % num_events_kinesis) + num_events_kinesis = 1 + num_kinesis_records = 10 + LOGGER.info('Putting %s records in %s event to stream...' % (num_kinesis_records, num_events_kinesis)) kinesis.put_records( Records=[ { 'Data': '{}', 'PartitionKey': 'testId%s' % i - } for i in range(0, num_events_kinesis) + } for i in range(0, num_kinesis_records) ], StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME ) # put 1 item to stream that will trigger an error in the Lambda - kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG, - PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME) + num_events_kinesis_err = 1 + for i in range(num_events_kinesis_err): + kinesis.put_record(Data='{"%s": 1}' % lambda_integration.MSG_BODY_RAISE_ERROR_FLAG, + PartitionKey='testIdError', StreamName=TEST_LAMBDA_SOURCE_STREAM_NAME) # create SNS topic, connect it to the Lambda, publish test messages num_events_sns = 3 response = sns.create_topic(Name=TEST_TOPIC_NAME) sns.subscribe(TopicArn=response['TopicArn'], Protocol='lambda', - Endpoint=aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_STREAM)) - for i in range(0, num_events_sns): + Endpoint=aws_stack.lambda_function_arn(lambda_stream_name)) + for i in range(num_events_sns): sns.publish(TopicArn=response['TopicArn'], Subject='test_subject', Message='test message %s' % i) # get latest records @@ -308,22 +311,16 @@ def check_events(): # check cloudwatch notifications def check_cw_invocations(): - num_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM) - # TODO: It seems that CloudWatch is currently reporting an incorrect number of - # invocations, namely the sum over *all* lambdas, not the single one we're asking for. - # Also, we need to bear in mind that Kinesis may perform batch updates, i.e., a single - # Lambda invocation may happen with a set of Kinesis records, hence we cannot simply - # add num_events_ddb to num_events_lambda above! - # self.assertEqual(num_invocations, 2 + num_events_lambda) - self.assertGreater(num_invocations, num_events_sns + num_events_sqs) - num_error_invocations = get_lambda_invocations_count(TEST_LAMBDA_NAME_STREAM, 'Errors', 15) - self.assertEqual(num_error_invocations, 1) + num_invocations = get_lambda_invocations_count(lambda_stream_name) + self.assertEqual(num_invocations, num_events_kinesis + num_events_kinesis_err + num_events_sns) + num_error_invocations = get_lambda_invocations_count(lambda_stream_name, 'Errors') + self.assertEqual(num_error_invocations, num_events_kinesis_err) # Lambda invocations are running asynchronously, hence sleep some time here to wait for results retry(check_cw_invocations, retries=5, sleep=2) # clean up - testutil.delete_lambda_function(TEST_LAMBDA_NAME_STREAM) + testutil.delete_lambda_function(lambda_stream_name) testutil.delete_lambda_function(TEST_LAMBDA_NAME_DDB) testutil.delete_lambda_function(TEST_LAMBDA_NAME_QUEUE) sqs.delete_queue(QueueUrl=sqs_queue_info['QueueUrl']) @@ -626,18 +623,18 @@ def get_event_source_arn(stream_name): return kinesis.describe_stream(StreamName=stream_name)['StreamDescription']['StreamARN'] -def get_lambda_invocations_count(lambda_name, metric=None, period=60, start_time=None, end_time=None): +def get_lambda_invocations_count(lambda_name, metric=None, period=None, start_time=None, end_time=None): metric = get_lambda_metrics(lambda_name, metric, period, start_time, end_time) if not metric['Datapoints']: return 0 return metric['Datapoints'][-1]['Sum'] -def get_lambda_metrics(func_name, metric=None, period=60, start_time=None, end_time=None): +def get_lambda_metrics(func_name, metric=None, period=None, start_time=None, end_time=None): metric = metric or 'Invocations' cloudwatch = aws_stack.connect_to_service('cloudwatch') - if end_time is None: - end_time = datetime.now() + period = period or 600 + end_time = end_time or datetime.now() if start_time is None: start_time = end_time - timedelta(seconds=period) return cloudwatch.get_metric_statistics(