Skip to content

Commit

Permalink
Merge pull request #1483 from bcgov/feature/ALCS-1706-2
Browse files Browse the repository at this point in the history
add srw-only import
  • Loading branch information
lstod authored Mar 7, 2024
2 parents 2a2868b + c20c535 commit db446d1
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 2 deletions.
13 changes: 13 additions & 0 deletions bin/migrate-files/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The files are uploaded in the format `/migrate/application||issue||planning_revi
- `document_id` is the primary key from the documents table
- `filename` is the filename metadata from the documents table

Note: SRWs are stored in the application folder but imported separately

## Libraries Used

os: used to interact with the file system
Expand Down Expand Up @@ -72,6 +74,7 @@ To run the script, run the following command:
python migrate-files.py application
python migrate-files.py application --start_document_id=500240 --end_document_id=505260 --last_imported_document_id=500475
```
Note: SRWs are stored in the application folder but imported separately

Application document import supports running multiple terminals at the same time with specifying baches of data to import.

Expand Down Expand Up @@ -117,6 +120,11 @@ python migrate-files.py planning
python migrate-files.py issue
```

```sh
# to start srw document import
python migrate-files.py srw
```

M1:

```sh
Expand All @@ -134,6 +142,11 @@ python3-intel64 migrate-files.py planning
python3-intel64 migrate-files.py issue
```

```sh
# to start srw document import
python3-intel64 migrate-files.py srw
```

The script will start uploading files from the Oracle database to DELL ECS. The upload progress will be displayed in a progress bar. For Planning and Issues documents the script will also save the last uploaded document id, so the upload process can be resumed from where it left off in case of any interruption. For Application documents import it is responsibility of whoever is running the process to specify "last_imported_document_id"

## Windows
Expand Down
1 change: 1 addition & 0 deletions bin/migrate-files/application_docs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .application_docs_import import import_application_docs
from .srw_docs_import import import_srw_docs
232 changes: 232 additions & 0 deletions bin/migrate-files/application_docs/srw_docs_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
from tqdm import tqdm
import cx_Oracle
from common import (
LAST_IMPORTED_APPLICATION_FILE,
DocumentUploadBasePath,
upload_file_to_s3,
get_starting_document_id,
get_max_file_size,
EntityType,
handle_document_processing_error,
fetch_data_from_oracle,
process_results,
log_last_imported_file,
generate_log_file_name,
)

log_file_name = generate_log_file_name(LAST_IMPORTED_APPLICATION_FILE)


def import_srw_docs(
batch,
cursor,
conn,
s3,
start_document_id_arg,
end_document_id_arg,
last_imported_document_id_arg,
):
# Get total number of files
application_count = _get_total_number_of_files(
cursor, start_document_id_arg, end_document_id_arg
)
last_imported_document_id_arg = last_imported_document_id_arg
offset = (
last_imported_document_id_arg
if last_imported_document_id_arg == 0
else _get_total_number_of_transferred_files(
cursor, start_document_id_arg, last_imported_document_id_arg
)
)
print(
f"{EntityType.APPLICATION.value} count = {application_count} offset = {offset}"
)
starting_document_id = last_imported_document_id_arg

# Track progress
documents_processed = 0
last_document_id = starting_document_id

try:
with tqdm(
total=application_count,
initial=offset,
unit="file",
desc=f"Uploading {EntityType.APPLICATION.value} files to S3",
) as documents_upload_progress_bar:
max_file_size = get_max_file_size(cursor)

while True:
starting_document_id = get_starting_document_id(
starting_document_id, last_document_id, EntityType.APPLICATION.value
)

params = {
"starting_document_id": starting_document_id,
"end_document_id": end_document_id_arg,
"max_file_size": max_file_size,
"batch_size": batch,
}
data = fetch_data_from_oracle(_document_query, cursor, params)

if not data:
break
# Upload the batch to S3 with a progress bar
for (
file_size,
document_id,
application_id,
filename,
file,
) in data:
tqdm.write(f"{application_id}/{document_id}_{filename}")

upload_file_to_s3(
s3,
DocumentUploadBasePath.APPLICATION.value,
file_size,
document_id,
application_id,
filename,
file,
)

documents_upload_progress_bar.update(1)
last_document_id = document_id
documents_processed += 1
log_last_imported_file(last_document_id, log_file_name)

except Exception as error:
handle_document_processing_error(
cursor,
conn,
error,
EntityType.APPLICATION.value,
documents_processed,
last_document_id,
log_file_name,
)

# Display results
process_results(
EntityType.APPLICATION.value,
application_count,
documents_processed,
last_document_id,
log_file_name,
)

return


_document_query = """
WITH app_docs_srw AS (
SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id
),
documents_with_cumulative_file_size AS (
SELECT
ROW_NUMBER() OVER(
ORDER BY od.DOCUMENT_ID ASC
) row_num,
dbms_lob.getLength(DOCUMENT_BLOB) file_size,
SUM(dbms_lob.getLength(DOCUMENT_BLOB)) OVER (ORDER BY od.DOCUMENT_ID ASC ROWS UNBOUNDED PRECEDING) AS cumulative_file_size,
od.DOCUMENT_ID,
ALR_APPLICATION_ID,
FILE_NAME,
DOCUMENT_BLOB,
DOCUMENT_CODE,
DESCRIPTION,
DOCUMENT_SOURCE_CODE,
UPLOADED_DATE,
WHEN_UPDATED,
REVISION_COUNT
FROM
OATS.OATS_DOCUMENTS od
JOIN app_docs_srw appds ON appds.document_id = od.document_id -- this will filter out all non SRW related documents
WHERE
dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND od.DOCUMENT_ID > :starting_document_id
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
AND ALR_APPLICATION_ID IS NOT NULL
ORDER BY
DOCUMENT_ID ASC
)
SELECT
file_size,
docwc.DOCUMENT_ID,
ALR_APPLICATION_ID,
FILE_NAME,
DOCUMENT_BLOB
FROM
documents_with_cumulative_file_size docwc
WHERE
cumulative_file_size < :max_file_size
AND row_num < :batch_size
ORDER BY
docwc.DOCUMENT_ID ASC
"""


def _get_total_number_of_files(cursor, start_document_id, end_document_id):
try:
cursor.execute(
"""
WITH app_docs_srw AS (
SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id
)
SELECT COUNT(*)
FROM OATS.OATS_DOCUMENTS od
JOIN app_docs_srw ON app_docs_srw.document_id = od.document_id
WHERE dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND ALR_APPLICATION_ID IS NOT NULL
AND (:start_document_id = 0 OR od.DOCUMENT_ID > :start_document_id)
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
""",
{
"start_document_id": start_document_id,
"end_document_id": end_document_id,
},
)
return cursor.fetchone()[0]
except cx_Oracle.Error as e:
raise Exception("Oracle Error: {}".format(e))


def _get_total_number_of_transferred_files(cursor, start_document_id, end_document_id):
try:
cursor.execute(
"""
WITH app_docs_srw AS (
SELECT document_id FROM oats.oats_documents od
LEFT JOIN oats.oats_alr_appl_components oaac ON oaac.alr_application_id = od.alr_application_id
WHERE oaac.alr_change_code = 'SRW'
GROUP BY od.document_id
)
SELECT COUNT(*)
FROM OATS.OATS_DOCUMENTS od
JOIN app_docs_srw ON app_docs_srw.document_id = od.document_id
WHERE dbms_lob.getLength(DOCUMENT_BLOB) > 0
AND ALR_APPLICATION_ID IS NOT NULL
AND od.DOCUMENT_ID > :start_document_id
AND (:end_document_id = 0 OR od.DOCUMENT_ID <= :end_document_id)
""",
{
"start_document_id": start_document_id,
"end_document_id": end_document_id,
},
)
return cursor.fetchone()[0]
except cx_Oracle.Error as e:
raise Exception("Oracle Error: {}".format(e))
14 changes: 12 additions & 2 deletions bin/migrate-files/migrate-files.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ecs_access_key,
ecs_secret_key,
)
from application_docs import import_application_docs
from application_docs import import_application_docs, import_srw_docs
from planning_docs import import_planning_review_docs
from issue_docs import import_issue_docs
import argparse
Expand Down Expand Up @@ -61,6 +61,16 @@ def main(args):
import_planning_review_docs(batch_size, cursor, conn, s3)
elif args.document_type == "issue":
import_issue_docs(batch_size, cursor, conn, s3)
elif args.document_type == "srw":
import_srw_docs(
batch_size,
cursor,
conn,
s3,
start_document_id,
end_document_id,
last_imported_document_id,
)

print("File upload complete, closing connection")

Expand All @@ -73,7 +83,7 @@ def _parse_command_line_args(args):
parser = argparse.ArgumentParser()
parser.add_argument(
"document_type",
choices=["application", "planning", "issue"],
choices=["application", "planning", "issue", "srw"],
help="Document type to be processed",
)
parser.add_argument(
Expand Down

0 comments on commit db446d1

Please sign in to comment.