Skip to content

Commit

Permalink
Update __init__.py
Browse files Browse the repository at this point in the history
  • Loading branch information
v-sabiraj committed Dec 23, 2024
1 parent 222bc2c commit 7a335fb
Showing 1 changed file with 12 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
AWS_REGION_NAME = os.environ['AWS_REGION_NAME']
QUEUE_URL = os.environ['QUEUE_URL']
VISIBILITY_TIMEOUT = 1800
LINE_SEPARATOR = os.environ.get(
'lineSeparator', '[\n\r\x0b\v\x0c\f\x1c\x1d\x85\x1e\u2028\u2029]+')
LINE_SEPARATOR = os.environ.get('lineSeparator', '[\n\r\x0b\v\x0c\f\x1c\x1d\x85\x1e\u2028\u2029]+')
connection_string = os.environ['AzureWebJobsStorage']

# Defines how many files can be processed simultaneously
MAX_CONCURRENT_PROCESSING_FILES = int(
os.environ.get('SimultaneouslyProcessingFiles', 20))
MAX_CONCURRENT_PROCESSING_FILES = int(os.environ.get('SimultaneouslyProcessingFiles', 20))

# Defines max number of events that can be sent in one request to Microsoft Sentinel
MAX_BUCKET_SIZE = int(os.environ.get('EventsBucketSize', 2000))
Expand Down Expand Up @@ -59,8 +57,7 @@ def _create_sqs_client():

def _create_s3_client():
s3_session = get_session()
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={
'max_attempts': 10, 'mode': 'standard'})
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={'max_attempts': 10, 'mode': 'standard'})
return s3_session.create_client(
's3',
region_name=AWS_REGION_NAME,
Expand Down Expand Up @@ -119,8 +116,7 @@ async def main(mytimer: func.TimerRequest):
failed_files_array.clear()
script_start_time = int(time.time())
filepath = 'drop_files_array_file'
state = StateManager(connection_string=connection_string,
share_name='funcstatemarkershare', file_path=filepath)
state = StateManager(connection_string=connection_string, share_name='funcstatemarkershare', file_path=filepath)
last_dropped_messages = state.get()
last_dropped_messages_obj = ''
if last_dropped_messages != None and last_dropped_messages != '':
Expand All @@ -131,10 +127,8 @@ async def main(mytimer: func.TimerRequest):
async with _create_sqs_client() as client:
async with aiohttp.ClientSession() as session:
if len(last_dropped_messages_obj) > 0:
logging.info(
"Processing files which added to re-processing. Files: {}".format(last_dropped_messages_obj))
last_dropped_messages_obj_sorted = sort_files_by_bucket(
last_dropped_messages_obj)
logging.info("Processing files which added to re-processing. Files: {}".format(last_dropped_messages_obj))
last_dropped_messages_obj_sorted = sort_files_by_bucket(last_dropped_messages_obj)
for reprocessing_file_msg in last_dropped_messages_obj_sorted:
await download_message_files(reprocessing_file_msg, session, retrycount=1)
logging.info('Trying to check messages off the queue...')
Expand All @@ -150,38 +144,30 @@ async def main(mytimer: func.TimerRequest):
logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(
msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"]))
await download_message_files(body_obj, session, retrycount=0)
logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(
body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"]))
logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"]))
try:
await client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=msg['ReceiptHandle']
)
except Exception as e:
logging.error("Error during deleting message with MessageId {} from queue. Bucket: {}. Path prefix: {}. Error: {}".format(
msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"], e))
logging.error("Error during deleting message with MessageId {} from queue. Bucket: {}. Path prefix: {}. Error: {}".format(msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"], e))
else:
logging.info('No messages in queue. Re-trying to check...')
except KeyboardInterrupt:
pass
if len(drop_files_array) > 0:
logging.info(
"list of files that were not processed: {}".format(drop_files_array))
logging.info("list of files that were not processed: {}".format(drop_files_array))
state.post(str(json.dumps(drop_files_array)))

if len(failed_files_array) > 0:
logging.info("list of files that were not processed after defined no. of retries: {}".format(
failed_files_array))
logging.info("list of files that were not processed after defined no. of retries: {}".format(failed_files_array))


async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
async with semaphore:
logging.info(
"retrycount before total_events: {}".format(retrycount))
total_events = 0
logging.info("Start processing file {}".format(s3_path))
logging.info(
"retrycount before sentinel: {}".format(retrycount))
sentinel = AzureSentinelConnectorAsync(
session,
LOG_ANALYTICS_URI,
Expand All @@ -192,8 +178,6 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
)
try:
response = await client.get_object(Bucket=bucket, Key=s3_path)
logging.info(
"retrycount after response: {}".format(retrycount))
s = ''
async for decompressed_chunk in AsyncGZIPDecompressedStream(response["Body"]):
s += decompressed_chunk.decode(errors='ignore')
Expand All @@ -216,29 +200,19 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
logging.info("Finish processing file {}. Sent events: {}".format(
s3_path, sentinel.successfull_sent_events_number))
except Exception as e:
logging.info(
"retrycount in exception: {}".format(retrycount))
if (retrycount <= 0):
logging.warn(
"Processing file {} was failed. Error: {}".format(s3_path, e))
logging.warn("Processing file {} was failed. Error: {}".format(s3_path, e))
drop_files_array.append({'bucket': bucket, 'path': s3_path})
else:
logging.warn(
"Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e))
logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e))
failed_files_array.append({'bucket': bucket, 'path': s3_path})


async def download_message_files(msg, session, retrycount):
logging.info(
"retrycount before semaphore: {}".format(retrycount))
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSING_FILES)
logging.info(
"retrycount after semaphore: {}".format(retrycount))
async with _create_s3_client() as client:
cors = []
for s3_file in msg['files']:
logging.info(
"retrycount before process file: {}".format(retrycount))
cors.append(process_file(
msg['bucket'], s3_file['path'], client, semaphore, session, retrycount))
await asyncio.gather(*cors)

0 comments on commit 7a335fb

Please sign in to comment.