Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

<<Do not Merge>> Added additional logs for Snowflake. #7452

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def main(mytimer: func.TimerRequest):
state_manager_queries = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_queries')
state_manager_rlogins = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_rlogins')
state_manager_rqueries = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_rqueries')
state_manager_grantstousers = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_grantstousers')
state_manager_datatransferhistory = StateManager(FILE_SHARE_CONN_STRING, file_path='snowflake_datatransferhistory')

logins_date_from = state_manager_logins.get()
logins_date_from = parse_date_from(logins_date_from)
Expand Down Expand Up @@ -138,6 +140,49 @@ def main(mytimer: func.TimerRequest):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return


grantstousers_date_from = state_manager_grantstousers.get()
grantstousers_date_from = parse_date_from(grantstousers_date_from)
logging.info(f'Getting GRANTS TO USERS events from {grantstousers_date_from}')
last_ts = None
for event in get_grant_users_events(ctx, grantstousers_date_from):
sentinel.send(event)
last_ts = event.get('CREATED_ON')
if sentinel.is_empty() and last_ts:
state_manager_grantstousers.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return
sentinel.flush()
if last_ts:
state_manager_grantstousers.post(last_ts)

if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return

datatransferhistory_date_from = state_manager_datatransferhistory.get()
datatransferhistory_date_from = parse_date_from(datatransferhistory_date_from)
logging.info(f'Getting DATATRANSFER HISTORY events from {datatransferhistory_date_from}')
last_ts = None
for event in get_datatransfer_events(ctx, datatransferhistory_date_from):
sentinel.send(event)
last_ts = event.get("START_TIME")
if sentinel.is_empty() and last_ts:
state_manager_datatransferhistory.post(last_ts)
if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return

sentinel.flush()
if last_ts:
state_manager_logins.post(last_ts)
state_manager_datatransferhistory.post(last_ts)

if check_if_script_runs_too_long(script_start_time):
logging.info(f'Script is running too long. Stop processing new events. Finish script. Sent events: {sentinel.successfull_sent_events_number}')
return

ctx.close()
logging.info(f'Script finished. Sent events: {sentinel.successfull_sent_events_number}')

Expand Down Expand Up @@ -199,6 +244,29 @@ def get_reader_query_events(ctx: snowflake.connector.SnowflakeConnection, date_f
finally:
cs.close()

def get_grant_users_events(ctx: snowflake.connector.SnowflakeConnection, date_from: datetime.datetime) -> Iterable[dict]:
cs = ctx.cursor(DictCursor)
try:
cs.execute("use schema snowflake.account_usage")
cs.execute(f"SELECT * from GRANTS_TO_USERS WHERE CREATED_ON > '{date_from.isoformat()}' ORDER BY CREATED_ON ASC")
for row in cs:
row = parse_grant_users_event(row)
yield row
finally:
cs.close()


def get_datatransfer_events(ctx: snowflake.connector.SnowflakeConnection, date_from: datetime.datetime) -> Iterable[dict]:
cs = ctx.cursor(DictCursor)
try:
cs.execute("use schema snowflake.account_usage")
cs.execute(f"SELECT * from DATA_TRANSFER_HISTORY WHERE START_TIME > '{date_from.isoformat()}' ORDER BY START_TIME ASC")
for row in cs:
row = parse_datatransfer_event(row)
yield row
finally:
cs.close()


def parse_login_event(event: dict) -> dict:
if 'EVENT_TIMESTAMP' in event and isinstance(event['EVENT_TIMESTAMP'], datetime.datetime):
Expand All @@ -215,6 +283,23 @@ def parse_query_event(event: dict) -> dict:
event['source_table'] = 'QUERY_HISTORY'
return event

def parse_grant_users_event(event: dict) -> dict:
if 'CREATED_ON' in event and isinstance(event['CREATED_ON'], datetime.datetime):
event['CREATED_ON'] = event['CREATED_ON'].isoformat()
if 'DELETED_ON' in event and isinstance(event['DELETED_ON'], datetime.datetime):
event['DELETED_ON'] = event['DELETED_ON'].isoformat()
event['source_table'] = 'GRANTS_TO_USERS'
return event


def parse_datatransfer_event(event: dict) -> dict:
if 'START_TIME' in event and isinstance(event['START_TIME'], datetime.datetime):
event['START_TIME'] = event['START_TIME'].isoformat()
if 'END_TIME' in event and isinstance(event['END_TIME'], datetime.datetime):
event['END_TIME'] = event['END_TIME'].isoformat()
event['source_table'] = 'DATA_TRANSFER_HISTORY'
return event


def check_if_script_runs_too_long(script_start_time: int) -> bool:
now = int(time.time())
Expand Down
Binary file modified Solutions/Snowflake/Data Connectors/SnowflakeConn.zip
Binary file not shown.