From 7a335fb07f51620b627144fb66c643a80d5858b7 Mon Sep 17 00:00:00 2001 From: v-sabiraj Date: Mon, 23 Dec 2024 16:53:41 +0530 Subject: [PATCH] Update __init__.py --- .../__init__.py | 50 +++++-------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py index a95120c9782..fe1c5864a05 100644 --- a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py +++ b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py @@ -24,13 +24,11 @@ AWS_REGION_NAME = os.environ['AWS_REGION_NAME'] QUEUE_URL = os.environ['QUEUE_URL'] VISIBILITY_TIMEOUT = 1800 -LINE_SEPARATOR = os.environ.get( - 'lineSeparator', '[\n\r\x0b\v\x0c\f\x1c\x1d\x85\x1e\u2028\u2029]+') +LINE_SEPARATOR = os.environ.get('lineSeparator', '[\n\r\x0b\v\x0c\f\x1c\x1d\x85\x1e\u2028\u2029]+') connection_string = os.environ['AzureWebJobsStorage'] # Defines how many files can be processed simultaneously -MAX_CONCURRENT_PROCESSING_FILES = int( - os.environ.get('SimultaneouslyProcessingFiles', 20)) +MAX_CONCURRENT_PROCESSING_FILES = int(os.environ.get('SimultaneouslyProcessingFiles', 20)) # Defines max number of events that can be sent in one request to Microsoft Sentinel MAX_BUCKET_SIZE = int(os.environ.get('EventsBucketSize', 2000)) @@ -59,8 +57,7 @@ def _create_sqs_client(): def _create_s3_client(): s3_session = get_session() - boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={ - 'max_attempts': 10, 'mode': 'standard'}) + boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={'max_attempts': 10, 'mode': 'standard'}) return s3_session.create_client( 's3', region_name=AWS_REGION_NAME, @@ -119,8 +116,7 @@ async def main(mytimer: func.TimerRequest): failed_files_array.clear() script_start_time = int(time.time()) filepath = 'drop_files_array_file' - state = StateManager(connection_string=connection_string, - share_name='funcstatemarkershare', file_path=filepath) + state = StateManager(connection_string=connection_string, share_name='funcstatemarkershare', file_path=filepath) last_dropped_messages = state.get() last_dropped_messages_obj = '' if last_dropped_messages != None and last_dropped_messages != '': @@ -131,10 +127,8 @@ async def main(mytimer: func.TimerRequest): async with _create_sqs_client() as client: async with aiohttp.ClientSession() as session: if len(last_dropped_messages_obj) > 0: - logging.info( - "Processing files which added to re-processing. Files: {}".format(last_dropped_messages_obj)) - last_dropped_messages_obj_sorted = sort_files_by_bucket( - last_dropped_messages_obj) + logging.info("Processing files which added to re-processing. Files: {}".format(last_dropped_messages_obj)) + last_dropped_messages_obj_sorted = sort_files_by_bucket(last_dropped_messages_obj) for reprocessing_file_msg in last_dropped_messages_obj_sorted: await download_message_files(reprocessing_file_msg, session, retrycount=1) logging.info('Trying to check messages off the queue...') @@ -150,38 +144,30 @@ async def main(mytimer: func.TimerRequest): logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format( msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"])) await download_message_files(body_obj, session, retrycount=0) - logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format( - body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"])) + logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"])) try: await client.delete_message( QueueUrl=QUEUE_URL, ReceiptHandle=msg['ReceiptHandle'] ) except Exception as e: - logging.error("Error during deleting message with MessageId {} from queue. Bucket: {}. Path prefix: {}. Error: {}".format( - msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"], e)) + logging.error("Error during deleting message with MessageId {} from queue. Bucket: {}. Path prefix: {}. Error: {}".format(msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"], e)) else: logging.info('No messages in queue. Re-trying to check...') except KeyboardInterrupt: pass if len(drop_files_array) > 0: - logging.info( - "list of files that were not processed: {}".format(drop_files_array)) + logging.info("list of files that were not processed: {}".format(drop_files_array)) state.post(str(json.dumps(drop_files_array))) if len(failed_files_array) > 0: - logging.info("list of files that were not processed after defined no. of retries: {}".format( - failed_files_array)) + logging.info("list of files that were not processed after defined no. of retries: {}".format(failed_files_array)) async def process_file(bucket, s3_path, client, semaphore, session, retrycount): async with semaphore: - logging.info( - "retrycount before total_events: {}".format(retrycount)) total_events = 0 logging.info("Start processing file {}".format(s3_path)) - logging.info( - "retrycount before sentinel: {}".format(retrycount)) sentinel = AzureSentinelConnectorAsync( session, LOG_ANALYTICS_URI, @@ -192,8 +178,6 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount): ) try: response = await client.get_object(Bucket=bucket, Key=s3_path) - logging.info( - "retrycount after response: {}".format(retrycount)) s = '' async for decompressed_chunk in AsyncGZIPDecompressedStream(response["Body"]): s += decompressed_chunk.decode(errors='ignore') @@ -216,29 +200,19 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount): logging.info("Finish processing file {}. Sent events: {}".format( s3_path, sentinel.successfull_sent_events_number)) except Exception as e: - logging.info( - "retrycount in exception: {}".format(retrycount)) if (retrycount <= 0): - logging.warn( - "Processing file {} was failed. Error: {}".format(s3_path, e)) + logging.warn("Processing file {} was failed. Error: {}".format(s3_path, e)) drop_files_array.append({'bucket': bucket, 'path': s3_path}) else: - logging.warn( - "Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e)) + logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e)) failed_files_array.append({'bucket': bucket, 'path': s3_path}) async def download_message_files(msg, session, retrycount): - logging.info( - "retrycount before semaphore: {}".format(retrycount)) semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSING_FILES) - logging.info( - "retrycount after semaphore: {}".format(retrycount)) async with _create_s3_client() as client: cors = [] for s3_file in msg['files']: - logging.info( - "retrycount before process file: {}".format(retrycount)) cors.append(process_file( msg['bucket'], s3_file['path'], client, semaphore, session, retrycount)) await asyncio.gather(*cors)