diff --git a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConn.zip b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConn.zip index ce530240be5..808917108da 100644 Binary files a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConn.zip and b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConn.zip differ diff --git a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py index 9ec0a2c6517..fe1c5864a05 100644 --- a/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py +++ b/Solutions/CrowdStrike Falcon Endpoint Protection/Data Connectors/CrowdstrikeReplicator/CrowdstrikeFalconAPISentinelConnector/__init__.py @@ -44,39 +44,47 @@ drop_files_array = [] failed_files_array = [] + def _create_sqs_client(): sqs_session = get_session() return sqs_session.create_client( - 'sqs', - region_name=AWS_REGION_NAME, - aws_access_key_id=AWS_KEY, - aws_secret_access_key=AWS_SECRET - ) + 'sqs', + region_name=AWS_REGION_NAME, + aws_access_key_id=AWS_KEY, + aws_secret_access_key=AWS_SECRET + ) + def _create_s3_client(): s3_session = get_session() - boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries = {'max_attempts': 10, 'mode': 'standard'}) + boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={'max_attempts': 10, 'mode': 'standard'}) return s3_session.create_client( - 's3', - region_name=AWS_REGION_NAME, - aws_access_key_id=AWS_KEY, - aws_secret_access_key=AWS_SECRET, - config=boto_config - ) + 's3', + region_name=AWS_REGION_NAME, + aws_access_key_id=AWS_KEY, + aws_secret_access_key=AWS_SECRET, + config=boto_config + ) + def customize_event(line): - element = json.loads(line) + try: + element = json.loads(line) # Attempt to parse the line as JSON + except json.JSONDecodeError as e: + # Log the error and skip this line + logging.error(f"JSON decoding error for line: {line}. Error: {str(e)}") + return None # Return None so that this line will be ignored during further processing required_fileds = [ - "timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName", - "RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk", - "UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence", - "CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime", - "UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName", - "TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId", - "SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId", - "event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName", - "GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName" - ] + "timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName", + "RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk", + "UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence", + "CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime", + "UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName", + "TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId", + "SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId", + "event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName", + "GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName" + ] required_fields_data = {} custom_fields_data = {} for key, value in element.items(): @@ -90,6 +98,7 @@ def customize_event(line): event["custom_fields_message"] = custom_fields_data_text return event + def sort_files_by_bucket(array_obj): array_obj = sorted(array_obj, key=itemgetter('bucket')) sorted_array = [] @@ -100,6 +109,7 @@ def sort_files_by_bucket(array_obj): sorted_array.append({'bucket': key, 'files': temp_array}) return sorted_array + async def main(mytimer: func.TimerRequest): global drop_files_array, failed_files_array drop_files_array.clear() @@ -131,7 +141,8 @@ async def main(mytimer: func.TimerRequest): if 'Messages' in response: for msg in response['Messages']: body_obj = json.loads(msg["Body"]) - logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"])) + logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format( + msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"])) await download_message_files(body_obj, session, retrycount=0) logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"])) try: @@ -151,19 +162,20 @@ async def main(mytimer: func.TimerRequest): if len(failed_files_array) > 0: logging.info("list of files that were not processed after defined no. of retries: {}".format(failed_files_array)) - + + async def process_file(bucket, s3_path, client, semaphore, session, retrycount): async with semaphore: total_events = 0 logging.info("Start processing file {}".format(s3_path)) sentinel = AzureSentinelConnectorAsync( - session, - LOG_ANALYTICS_URI, - WORKSPACE_ID, - SHARED_KEY, - LOG_TYPE, - queue_size=MAX_BUCKET_SIZE - ) + session, + LOG_ANALYTICS_URI, + WORKSPACE_ID, + SHARED_KEY, + LOG_TYPE, + queue_size=MAX_BUCKET_SIZE + ) try: response = await client.get_object(Bucket=bucket, Key=s3_path) s = '' @@ -173,36 +185,34 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount): for n, line in enumerate(lines): if n < len(lines) - 1: if line: - try: - event = customize_event(line) - except ValueError as e: - logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e))) - raise e + event = customize_event(line) + if event is None: # Skip malformed lines + continue await sentinel.send(event) s = line if s: - try: - event = customize_event(line) - except ValueError as e: - logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e))) - raise e + event = customize_event(line) + if event is None: # Skip malformed lines + return await sentinel.send(event) await sentinel.flush() total_events += sentinel.successfull_sent_events_number - logging.info("Finish processing file {}. Sent events: {}".format(s3_path, sentinel.successfull_sent_events_number)) + logging.info("Finish processing file {}. Sent events: {}".format( + s3_path, sentinel.successfull_sent_events_number)) except Exception as e: - if(retrycount<=0): - logging.warn("Processing file {} was failed. Error: {}".format(s3_path,e)) + if (retrycount <= 0): + logging.warn("Processing file {} was failed. Error: {}".format(s3_path, e)) drop_files_array.append({'bucket': bucket, 'path': s3_path}) else: - logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path,e)) + logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e)) failed_files_array.append({'bucket': bucket, 'path': s3_path}) - + async def download_message_files(msg, session, retrycount): semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSING_FILES) async with _create_s3_client() as client: cors = [] for s3_file in msg['files']: - cors.append(process_file(msg['bucket'], s3_file['path'], client, semaphore, session, retrycount)) + cors.append(process_file( + msg['bucket'], s3_file['path'], client, semaphore, session, retrycount)) await asyncio.gather(*cors)