Skip to content

Commit

Permalink
Merge pull request #11591 from Azure/v-sabiraj-crowdstrikeadditionall…
Browse files Browse the repository at this point in the history
…ogging

Updated Crowdstrike Connector to handle Json decode validation
  • Loading branch information
v-prasadboke authored Jan 16, 2025
2 parents 24ae446 + 7a335fb commit 1adca51
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 48 deletions.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -44,39 +44,47 @@
drop_files_array = []
failed_files_array = []


def _create_sqs_client():
sqs_session = get_session()
return sqs_session.create_client(
'sqs',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET
)
'sqs',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET
)


def _create_s3_client():
s3_session = get_session()
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries = {'max_attempts': 10, 'mode': 'standard'})
boto_config = BotoCoreConfig(region_name=AWS_REGION_NAME, retries={'max_attempts': 10, 'mode': 'standard'})
return s3_session.create_client(
's3',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET,
config=boto_config
)
's3',
region_name=AWS_REGION_NAME,
aws_access_key_id=AWS_KEY,
aws_secret_access_key=AWS_SECRET,
config=boto_config
)


def customize_event(line):
element = json.loads(line)
try:
element = json.loads(line) # Attempt to parse the line as JSON
except json.JSONDecodeError as e:
# Log the error and skip this line
logging.error(f"JSON decoding error for line: {line}. Error: {str(e)}")
return None # Return None so that this line will be ignored during further processing
required_fileds = [
"timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName",
"RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk",
"UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence",
"CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime",
"UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName",
"TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId",
"SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId",
"event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName",
"GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName"
]
"timestamp", "aip", "aid", "EventType", "LogonType", "HostProcessType", "UserPrincipal", "DomainName",
"RemoteAddressIP", "ConnectionDirection", "TargetFileName", "LocalAddressIP4", "IsOnRemovableDisk",
"UserPrincipal", "UserIsAdmin", "LogonTime", "LogonDomain", "RemoteAccount", "UserId", "Prevalence",
"CurrentProcess", "ConnectionDirection", "event_simpleName", "TargetProcessId", "ProcessStartTime",
"UserName", "DeviceProductId", "TargetSHA256HashData", "SHA256HashData", "MD5HashData", "TargetDirectoryName",
"TargetFileName", "FirewallRule", "TaskName", "TaskExecCommand", "TargetAddress", "TargetProcessId",
"SourceFileName", "RegObjectName", "RegValueName", "ServiceObjectName", "RegistryPath", "RawProcessId",
"event_platform", "CommandLine", "ParentProcessId", "ParentCommandLine", "ParentBaseFileName",
"GrandParentBaseFileName", "RemotePort", "VolumeDeviceType", "VolumeName", "ClientComputerName", "ProductId", "ComputerName"
]
required_fields_data = {}
custom_fields_data = {}
for key, value in element.items():
Expand All @@ -90,6 +98,7 @@ def customize_event(line):
event["custom_fields_message"] = custom_fields_data_text
return event


def sort_files_by_bucket(array_obj):
array_obj = sorted(array_obj, key=itemgetter('bucket'))
sorted_array = []
Expand All @@ -100,6 +109,7 @@ def sort_files_by_bucket(array_obj):
sorted_array.append({'bucket': key, 'files': temp_array})
return sorted_array


async def main(mytimer: func.TimerRequest):
global drop_files_array, failed_files_array
drop_files_array.clear()
Expand Down Expand Up @@ -131,7 +141,8 @@ async def main(mytimer: func.TimerRequest):
if 'Messages' in response:
for msg in response['Messages']:
body_obj = json.loads(msg["Body"])
logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"]))
logging.info("Got message with MessageId {}. Start processing {} files from Bucket: {}. Path prefix: {}. Timestamp: {}.".format(
msg["MessageId"], body_obj["fileCount"], body_obj["bucket"], body_obj["pathPrefix"], body_obj["timestamp"]))
await download_message_files(body_obj, session, retrycount=0)
logging.info("Finished processing {} files from MessageId {}. Bucket: {}. Path prefix: {}".format(body_obj["fileCount"], msg["MessageId"], body_obj["bucket"], body_obj["pathPrefix"]))
try:
Expand All @@ -151,19 +162,20 @@ async def main(mytimer: func.TimerRequest):

if len(failed_files_array) > 0:
logging.info("list of files that were not processed after defined no. of retries: {}".format(failed_files_array))



async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
async with semaphore:
total_events = 0
logging.info("Start processing file {}".format(s3_path))
sentinel = AzureSentinelConnectorAsync(
session,
LOG_ANALYTICS_URI,
WORKSPACE_ID,
SHARED_KEY,
LOG_TYPE,
queue_size=MAX_BUCKET_SIZE
)
session,
LOG_ANALYTICS_URI,
WORKSPACE_ID,
SHARED_KEY,
LOG_TYPE,
queue_size=MAX_BUCKET_SIZE
)
try:
response = await client.get_object(Bucket=bucket, Key=s3_path)
s = ''
Expand All @@ -173,36 +185,34 @@ async def process_file(bucket, s3_path, client, semaphore, session, retrycount):
for n, line in enumerate(lines):
if n < len(lines) - 1:
if line:
try:
event = customize_event(line)
except ValueError as e:
logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e)))
raise e
event = customize_event(line)
if event is None: # Skip malformed lines
continue
await sentinel.send(event)
s = line
if s:
try:
event = customize_event(line)
except ValueError as e:
logging.error('Error while loading json Event at s value {}. Error: {}'.format(line, str(e)))
raise e
event = customize_event(line)
if event is None: # Skip malformed lines
return
await sentinel.send(event)
await sentinel.flush()
total_events += sentinel.successfull_sent_events_number
logging.info("Finish processing file {}. Sent events: {}".format(s3_path, sentinel.successfull_sent_events_number))
logging.info("Finish processing file {}. Sent events: {}".format(
s3_path, sentinel.successfull_sent_events_number))
except Exception as e:
if(retrycount<=0):
logging.warn("Processing file {} was failed. Error: {}".format(s3_path,e))
if (retrycount <= 0):
logging.warn("Processing file {} was failed. Error: {}".format(s3_path, e))
drop_files_array.append({'bucket': bucket, 'path': s3_path})
else:
logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path,e))
logging.warn("Processing file {} was failed after defined no. of retries. Error: {}".format(s3_path, e))
failed_files_array.append({'bucket': bucket, 'path': s3_path})


async def download_message_files(msg, session, retrycount):
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSING_FILES)
async with _create_s3_client() as client:
cors = []
for s3_file in msg['files']:
cors.append(process_file(msg['bucket'], s3_file['path'], client, semaphore, session, retrycount))
cors.append(process_file(
msg['bucket'], s3_file['path'], client, semaphore, session, retrycount))
await asyncio.gather(*cors)

0 comments on commit 1adca51

Please sign in to comment.