Skip to content

Commit

Permalink
Merge pull request #1480 from bcgov/feature/ALCS-1706
Browse files Browse the repository at this point in the history
Notification Documents
  • Loading branch information
lstod authored Mar 6, 2024
2 parents 6635d63 + d09e0e4 commit 2a2868b
Show file tree
Hide file tree
Showing 17 changed files with 552 additions and 1 deletion.
3 changes: 2 additions & 1 deletion bin/migrate-oats-data/documents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .oats_documents_to_alcs_documents_app import *
from .alcs_documents_to_noi_documents import *
from .oats_documents_to_alcs_documents_noi import *
from .document_source_update import update_document_source
from .document_source_update import update_document_source
from .post_launch import *
1 change: 1 addition & 0 deletions bin/migrate-oats-data/documents/post_launch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .migrate_documents import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from common import (
setup_and_get_logger,
BATCH_UPLOAD_SIZE,
OATS_ETL_USER,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor

etl_name = "link_srw_documents_from_alcs"
logger = setup_and_get_logger(etl_name)

"""
This script connects to postgress version of OATS DB and links data from ALCS documents to ALCS notification_document table.
NOTE:
Before performing document import you need to import SRWs and SRW documents.
"""


@inject_conn_pool
def link_srw_documents(conn=None, batch_size=BATCH_UPLOAD_SIZE):
"""
function uses a decorator pattern @inject_conn_pool to inject a database connection pool to the function. It fetches the total count of documents and prints it to the console. Then, it fetches the documents to insert in batches using document IDs, constructs an insert query, and processes them.
"""
logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"documents/post_launch/sql/alcs_documents_to_notification_documents_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
total_count = dict(cursor.fetchone())["count"]
logger.info(f"Total count of documents to transfer: {total_count}")

failed_inserts_count = 0
successful_inserts_count = 0
last_document_id = 0

with open(
"documents/post_launch/sql/alcs_documents_to_notification_documents.sql",
"r",
encoding="utf-8",
) as sql_file:
documents_to_insert_sql = sql_file.read()
while True:
cursor.execute(
f"{documents_to_insert_sql} WHERE oats_document_id > {last_document_id} ORDER BY oats_document_id;"
)
rows = cursor.fetchmany(batch_size)
if not rows:
break
try:
documents_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows)

last_document_id = dict(rows[-1])["oats_document_id"]
successful_inserts_count = (
successful_inserts_count + documents_to_be_inserted_count
)

logger.debug(
f"retrieved/inserted items count: {documents_to_be_inserted_count}; total successfully inserted/updated documents so far {successful_inserts_count}; last inserted oats_document_id: {last_document_id}"
)
except Exception as e:
conn.rollback()
logger.exception(f"Error {e}")
failed_inserts_count += len(rows)
last_document_id = last_document_id + 1

logger.info(f"Total amount of successful inserts: {successful_inserts_count}")
logger.info(f"Total amount of failed inserts: {failed_inserts_count}")


def _insert_records(conn, cursor, rows):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows)
cursor.execute(insert_query, rows_to_insert)
conn.commit()


def _compile_insert_query(number_of_rows_to_insert):
documents_to_insert = ",".join(["%s"] * number_of_rows_to_insert)
return f"""
INSERT INTO alcs.notification_document(
notification_uuid,
document_uuid,
type_code,
visibility_flags,
oats_document_id,
oats_application_id,
audit_created_by,
survey_plan_number,
control_number
)
VALUES{documents_to_insert}
ON CONFLICT (oats_document_id, oats_application_id) DO UPDATE SET
notification_uuid = EXCLUDED.notification_uuid,
document_uuid = EXCLUDED.document_uuid,
type_code = EXCLUDED.type_code,
visibility_flags = EXCLUDED.visibility_flags,
audit_created_by = EXCLUDED.audit_created_by,
survey_plan_number = EXCLUDED.survey_plan_number,
control_number = EXCLUDED.control_number;
"""


def _prepare_data_to_insert(rows):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row)
row_without_last_element.append(tuple(mapped_row.values()))

return row_without_last_element


def _map_data(row):
return {
"notification_uuid": row["notification_uuid"],
"document_uuid": row["document_uuid"],
"type_code": row["type_code"],
"visibility_flags": row["visibility_flags"],
"oats_document_id": row["oats_document_id"],
"oats_application_id": row["oats_application_id"],
"audit_created_by": OATS_ETL_USER,
"plan_number": row["plan_no"],
"control_number": row["control_no"],
}


@inject_conn_pool
def clean_notification_documents(conn=None):
logger.info("Start documents cleaning")
with conn.cursor() as cursor:
cursor.execute(
f"DELETE FROM alcs.notification_document WHERE audit_created_by = '{OATS_ETL_USER}';"
)
conn.commit()
logger.info(f"Deleted items count = {cursor.rowcount}")
18 changes: 18 additions & 0 deletions bin/migrate-oats-data/documents/post_launch/migrate_documents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from .oats_documents_to_alcs_documents_srw import (
import_oats_srw_documents,
document_clean,
)
from .alcs_documents_to_notification_documents import (
link_srw_documents,
clean_notification_documents,
)


def import_documents(batch_size):
import_oats_srw_documents(batch_size)
link_srw_documents(batch_size)


def clean_documents():
clean_notification_documents()
document_clean()
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from common import (
setup_and_get_logger,
BATCH_UPLOAD_SIZE,
OATS_ETL_USER,
add_timezone_and_keep_date_part,
OatsToAlcsDocumentSourceCode,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor
import os

etl_name = "import_srw_documents_from_oats"
logger = setup_and_get_logger(etl_name)

"""
This script connects to postgress version of OATS DB and transfers data from OATS documents table to ALCS documents table.
NOTE:
Before performing document import you need to import SRWs from oats.
"""


@inject_conn_pool
def import_oats_srw_documents(conn=None, batch_size=BATCH_UPLOAD_SIZE):
"""
function uses a decorator pattern @inject_conn_pool to inject a database connection pool to the function. It fetches the total count of documents and prints it to the console. Then, it fetches the documents to insert in batches using document IDs, constructs an insert query, and processes them.
"""
logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"documents/post_launch/sql/oats_documents_to_alcs_documents_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
total_count = dict(cursor.fetchone())["count"]
logger.info(f"Total count of documents to transfer: {total_count}")

failed_inserts_count = 0
successful_inserts_count = 0
last_document_id = 0

with open(
"documents/post_launch/sql/oats_documents_to_alcs_documents.sql",
"r",
encoding="utf-8",
) as sql_file:
documents_to_insert_sql = sql_file.read()
while True:
cursor.execute(
f"{documents_to_insert_sql} WHERE document_id > {last_document_id} ORDER BY document_id;"
)
rows = cursor.fetchmany(batch_size)
if not rows:
break
try:
documents_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows)

last_document_id = dict(rows[-1])["document_id"]
successful_inserts_count = (
successful_inserts_count + documents_to_be_inserted_count
)

logger.debug(
f"retrieved/inserted items count: {documents_to_be_inserted_count}; total successfully inserted/updated documents so far {successful_inserts_count}; last inserted oats_document_id: {last_document_id}"
)
except Exception as e:
conn.rollback()
logger.exception(f"Error {e}")
failed_inserts_count += len(rows)
last_document_id = last_document_id + 1

logger.info(f"Total amount of successful inserts: {successful_inserts_count}")
logger.info(f"Total amount of failed inserts: {failed_inserts_count}")


def _insert_records(conn, cursor, rows):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows)
cursor.execute(insert_query, rows_to_insert)
conn.commit()


def _compile_insert_query(number_of_rows_to_insert):
documents_to_insert = ",".join(["%s"] * number_of_rows_to_insert)
return f"""
INSERT INTO alcs."document"(
oats_document_id,
file_name,
oats_application_id,
audit_created_by,
file_key,
mime_type,
tags,
"system",
uploaded_at,
source
)
VALUES{documents_to_insert}
ON CONFLICT (oats_document_id) DO UPDATE SET
oats_document_id = EXCLUDED.oats_document_id,
file_name = EXCLUDED.file_name,
oats_application_id = EXCLUDED.oats_application_id,
audit_created_by = EXCLUDED.audit_created_by,
file_key = EXCLUDED.file_key,
mime_type = EXCLUDED.mime_type,
tags = EXCLUDED.tags,
"system" = EXCLUDED."system",
uploaded_at = EXCLUDED.uploaded_at,
source = EXCLUDED.source;
"""


def _prepare_data_to_insert(rows):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row)
row_without_last_element.append(tuple(mapped_row.values()))

return row_without_last_element


def _map_data(row):
return {
"oats_document_id": row["oats_document_id"],
"file_name": row["file_name"],
"oats_application_id": row["oats_application_id"],
"audit_created_by": OATS_ETL_USER,
"file_key": row["file_key"],
"mime_type": _get_mime_type(row),
"tags": row["tags"],
"system": _map_system(row),
"file_upload_date": _get_upload_date(row),
"file_source": _get_document_source(row),
}


def _map_system(row):
who_created = row["who_created"]
if who_created in ("PROXY_OATS_LOCGOV", "PROXY_OATS_APPLICANT"):
sys = "OATS_P"
else:
sys = "OATS"
return sys


def _get_upload_date(data):
upload_date = data.get("uploaded_date", "")
created_date = data.get("when_created", "")
if upload_date:
return add_timezone_and_keep_date_part(upload_date)
else:
return add_timezone_and_keep_date_part(created_date)


def _get_document_source(data):
source = data.get("document_source_code", "")
if source:
source = str(OatsToAlcsDocumentSourceCode[source].value)

return source


def _get_mime_type(data):
file_name = data.get("file_name", "")
extension = os.path.splitext(file_name)[-1].lower().strip()
if extension == ".pdf":
return "application/pdf"
else:
return "application/octet-stream"


@inject_conn_pool
def document_clean(conn=None):
logger.info("Start documents cleaning")
with conn.cursor() as cursor:
cursor.execute(
f"DELETE FROM alcs.document WHERE audit_created_by = '{OATS_ETL_USER}' AND audit_created_at > '2024-02-08';"
)
conn.commit()
logger.info(f"Deleted items count = {cursor.rowcount}")

conn.commit()
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
with oats_documents_to_map as (
select n.uuid as notification_uuid,
d.uuid as document_uuid,
adc.code,
publicly_viewable_ind as is_public,
app_lg_viewable_ind as is_app_lg,
od.document_id as oats_document_id,
od.alr_application_id as oats_application_id,
oaa.plan_no,
oaa.control_no
from oats.oats_documents od
join alcs."document" d on d.oats_document_id = od.document_id::text
join alcs.document_code adc on adc.oats_code = od.document_code
join alcs.notification n on n.file_number = od.alr_application_id::text
JOIN oats.oats_alr_applications oaa ON od.alr_application_id = oaa.alr_application_id
)
select otm.notification_uuid,
otm.document_uuid,
otm.code as type_code,
(
case
when is_public = 'Y'
and is_app_lg = 'Y' then '{P, A, C, G}'::text []
when is_public = 'Y' then '{P}'::text []
when is_app_lg = 'Y' then '{A, C, G}'::text []
else '{}'::text []
end
) as visibility_flags,
oats_document_id,
oats_application_id,
plan_no,
control_no
from oats_documents_to_map otm
Loading

0 comments on commit 2a2868b

Please sign in to comment.