Skip to content

Commit

Permalink
Merge pull request #22 from CBIIT/crdcdh-1339-001-pgu
Browse files Browse the repository at this point in the history
Implement 1286
  • Loading branch information
n2iw authored Jul 18, 2024
2 parents a628af0 + 36d1268 commit f121de8
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 14 deletions.
2 changes: 2 additions & 0 deletions README-technical.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ Usage of the CLI tool:
-n --name-field
-s --size-field
-m --md5-field
-i --id-field
-o --omit-DCF-prefix

CLI configuration module will validate and combine parameters from CLI and/or config file
If config_file is given, then everything else is potentially optional (if it’s included in config file)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ You can put a manifest in the same folder with the data files, or you can put it
- name-field: column name in the manifest file that contains data file names
- size-field: column name in the manifest file that contains data file sizes
- md5-field: column name in the manifest file that contains data file MD5 checksums
- id-field: column name in the manifest file that contains data file ID
- omit-DCF-prefix: boolean to define if need DCF prefix "dg.4DFC"
- retries: number of retries the CLI tool will perform after a failed upload
- overwrite: if set to “true”, CLI will upload a data file to overwrite the data file with same name that already exists in the Data Hub target storage. If set to “false”, CLI will not upload a data file if a data file with the same name exists in the Data Hub target storage.
- dryrun: if set to “true”, CLI will not upload any data files to the Data Hub target storage. If set to “false”, CLI will upload data files to the Data Hub target storage.
Expand Down
4 changes: 4 additions & 0 deletions configs/uploader-file-config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Config:
data: /path_to_the_folder_that_contains_your_data_files
#path to manifest file, conditional required when type = data file
manifest: /path_to_the_manifest_file
#file id header name in the manifest file in data model, file_id for CDS
id-field: file_id
# if id_field need to be DCF prefix id, read from data model content.json
omit-DCF-prefix: false
#file name header name in the manifest file, default is file_name
name-field: file_name
#file size header name in the manifest file, default is file_size
Expand Down
3 changes: 3 additions & 0 deletions src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
FILE_SIZE_FIELD = "size-field"
FILE_SIZE_DEFAULT = "file_size" #match data model file size name
FILE_MD5_FIELD = "md5-field"
FILE_ID_FIELD = "id-field"
FILE_ID_DEFAULT = "fileID"
OMIT_DCF_PREFIX = "omit-DCF-prefix"
MD5_DEFAULT = "md5sum" #match data model md5 name
TOKEN = "token"
API_URL = "api-url"
Expand Down
4 changes: 4 additions & 0 deletions src/common/graphql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def create_batch(self, file_array):
filePrefix,
type,
fileCount,
files {{
fileID,
fileName,
}}
status,
createdAt
}}
Expand Down
32 changes: 32 additions & 0 deletions src/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import sys
import csv
from uuid import UUID

"""
clean_up_key_value(dict)
Expand Down Expand Up @@ -47,4 +48,35 @@ def dump_dict_to_tsv(dict_list, file_path):
dict_writer.writerows(dict_list)
return True

def is_valid_uuid(uuid_to_test, version=5):
"""
Check if uuid_to_test is a valid UUID.
Parameters
----------
uuid_to_test : str
version : {1, 2, 3, 4}
Returns
-------
`True` if uuid_to_test is a valid UUID, otherwise `False`.
Examples
--------
>>> is_valid_uuid('c9bf9e57-1685-4c89-bafb-ff5af830be8a')
True
>>> is_valid_uuid('c9bf9e58')
False
"""

try:
uuid_obj = UUID(uuid_to_test, version=version)
uuid_str = str(uuid_obj)
if uuid_str == uuid_to_test:
return True
else:
return is_valid_uuid(uuid_to_test, int(version)-1)
except ValueError:
return False


67 changes: 55 additions & 12 deletions src/file_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import glob

from common.constants import UPLOAD_TYPE, TYPE_FILE, TYPE_MATE_DATA, FILE_NAME_DEFAULT, FILE_SIZE_DEFAULT, MD5_DEFAULT, \
FILE_DIR, FILE_MD5_FIELD, PRE_MANIFEST, FILE_NAME_FIELD, FILE_SIZE_FIELD, FILE_PATH, SUCCEEDED, ERRORS
from common.utils import clean_up_key_value, clean_up_strs, get_exception_msg
FILE_DIR, FILE_MD5_FIELD, PRE_MANIFEST, FILE_NAME_FIELD, FILE_SIZE_FIELD, FILE_PATH, SUCCEEDED, ERRORS, FILE_ID_DEFAULT,\
FILE_ID_FIELD, OMIT_DCF_PREFIX
from common.utils import clean_up_key_value, clean_up_strs, get_exception_msg, is_valid_uuid
from bento.common.utils import get_logger, get_md5


Expand All @@ -26,6 +27,9 @@ def __init__(self, configs):
self.fileList = [] #list of files object {file_name, file_path, file_size, invalid_reason}
self.log = get_logger('File_Validator')
self.invalid_count = 0
self.has_file_id = None
self.manifest_rows = None
self.field_names = None

def validate(self):
# check file dir
Expand Down Expand Up @@ -64,65 +68,104 @@ def validate_size_md5(self):
self.files_info = self.read_manifest()
if not self.files_info or len(self.files_info ) == 0:
return False

line_num = 2
for info in self.files_info:
invalid_reason = ""
file_path = os.path.join(self.file_dir, info[FILE_NAME_DEFAULT])
size = info.get(FILE_SIZE_DEFAULT)
size_info = 0 if not size or not size.isdigit() else int(size)
info[FILE_SIZE_DEFAULT] = size_info #convert to int

file_id = info.get(FILE_ID_DEFAULT)
if not os.path.isfile(file_path):
invalid_reason += f"File {file_path} does not exist!"
#file dictionary: {FILE_NAME_DEFAULT: None, FILE_SIZE_DEFAULT: None, FILE_INVALID_REASON: None}
self.fileList.append({FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: size_info, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: size_info, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.invalid_count += 1
continue

file_size = os.path.getsize(file_path)
if file_size != size_info:
invalid_reason += f"Real file size {file_size} of file {info[FILE_NAME_DEFAULT]} does not match with that in manifest {info[FILE_SIZE_DEFAULT]}!"
self.fileList.append({FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: invalid_reason})
self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: invalid_reason})
self.invalid_count += 1
continue

md5_info = info[MD5_DEFAULT]
if not md5_info:
invalid_reason += f"MD5 of {info[FILE_NAME_DEFAULT]} is not set in the pre-manifest!"
self.fileList.append({FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: None, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.invalid_count += 1
continue
#calculate file md5
md5sum = get_md5(file_path)
if md5_info != md5sum:
invalid_reason += f"Real file md5 {md5sum} of file {info[FILE_NAME_DEFAULT]} does not match with that in manifest {md5_info}!"
self.fileList.append({FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: md5sum, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: md5sum, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.invalid_count += 1
continue
# validate file id
result, msg = self.validate_file_id(file_id, line_num)
if not result:
invalid_reason += msg
self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: md5sum, SUCCEEDED: False, ERRORS: [invalid_reason]})
self.invalid_count += 1

self.fileList.append({FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: md5sum, SUCCEEDED: None, ERRORS: None})

self.fileList.append({FILE_ID_DEFAULT: file_id, FILE_NAME_DEFAULT: info.get(FILE_NAME_DEFAULT), FILE_PATH: file_path, FILE_SIZE_DEFAULT: file_size, MD5_DEFAULT: md5sum, SUCCEEDED: None, ERRORS: None})
line_num += 1
return True

#public function to read pre-manifest and return list of file records
def read_manifest(self):
files_info = []
files_dict = {}
manifest_rows = []
try:
with open(self.pre_manifest) as pre_m:
reader = csv.DictReader(pre_m, delimiter='\t')
self.field_names = clean_up_strs(reader.fieldnames)
if not self.field_names:
self.field_names = clean_up_strs(reader.fieldnames)
for info in reader:
file_info = clean_up_key_value(info)
manifest_rows.append(file_info)
file_name = file_info[self.configs.get(FILE_NAME_FIELD)]
file_id = file_info.get(self.configs.get(FILE_ID_FIELD))
if self.has_file_id is None:
self.has_file_id = self.configs.get(FILE_ID_FIELD) in info.keys()
files_dict.update({file_name: {
FILE_ID_DEFAULT: file_id,
FILE_NAME_DEFAULT: file_name,
FILE_SIZE_DEFAULT: file_info[self.configs.get(FILE_SIZE_FIELD)],
MD5_DEFAULT: file_info[self.configs.get(FILE_MD5_FIELD)]
}})
files_info = list(files_dict.values())
self.manifest_rows = manifest_rows
except Exception as e:
self.log.debug(e)
self.log.exception(f"Reading manifest failed - internal error. Please try again and contact the helpdesk if this error persists.")
return files_info


def validate_file_id(self, id, line_num):
id_field_name = self.configs.get(FILE_ID_FIELD)
if id:
if self.configs[OMIT_DCF_PREFIX] == False:
msg = f'Line {line_num}: "{id_field_name}": "{id}" is not in correct format. A correct "{id_field_name}" should look like "dg.4DFC/e041576e-3595-5c8b-b0b3-272bc7cb6aa8". You can provide correct "{id_field_name}" or remove the column and let the system generate it for you.'
if not id.startswith("dg.4DFC/"):
self.log.error(msg)
return False, msg
else:
uuid = id.split('/')[1]
if not is_valid_uuid(uuid):
self.log.error(msg)
return False, msg
else:
if(not is_valid_uuid(id)):
msg = f'Line {line_num}: "{id_field_name}": "{id}" is not in correct format. A correct "{id_field_name}" should look like "e041576e-3595-5c8b-b0b3-272bc7cb6aa8". You can provide correct "{id_field_name}" or remove the column and let the system generate it for you.'
self.log.error(msg)
return False, msg
else:
if self.has_file_id:
msg = f'Line {line_num}: "{id_field_name}" is required but not provided. You can provide correct "{id_field_name}" or remove the column and let the system generate it for you.'
self.log.error(msg)
return False, msg

return True, None
92 changes: 92 additions & 0 deletions src/process_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import csv, os, io
from common.constants import FILE_ID_DEFAULT, FILE_NAME_FIELD, BATCH_BUCKET, S3_BUCKET, FILE_PREFIX, BATCH_ID, BATCH, BATCH_CREATED,\
FILE_ID_FIELD, UPLOAD_TYPE, FILE_NAME_DEFAULT, FILE_PATH, FILE_SIZE_DEFAULT, BATCH_STATUS, PRE_MANIFEST
from common.graphql_client import APIInvoker
from copier import Copier


def process_manifest_file(configs, has_file_id, file_infos, manifest_rows, manifest_columns):
"""
function: process_manifest_file
params:
configs: the config object of uploader
file_path: the path of the pre-manifest file
has_file_id: whether the pre-manifest file has file id column or not
file_infos: the file info array of the pre-manifest file
manifest_rows: the rows of the pre-manifest file
manifest_columns: the columns of the pre-manifest file
return:
True or False
steps:
1) add file id to the pre-manifest file if no file id column
2) create a batch for upload the final manifest file
3) upload the final manifest file to S3
4) update the batch with file info.
"""
if not file_infos or len(file_infos) == 0:
print(f"Failed to add file id to the pre-manifest, {file_path}.")
return False
file_path = configs.get(PRE_MANIFEST)
final_manifest_path = (str.replace(file_path, ".tsv", "-final.tsv") if ".tsv" in file_path else str.replace(file_path, ".txt", "-final.tsv")) if not has_file_id else file_path
file_id_name = configs[FILE_ID_FIELD]
manifest_columns.append(file_id_name)
result = None
newBatch = None
manifest_file_info = None
try:
if not has_file_id:
result = add_file_id(file_id_name, final_manifest_path , file_infos, manifest_rows, manifest_columns)
if not result:
print(f"Failed to add file id to the pre-manifest, {final_manifest_path }.")
return False
# create a batch for upload the final manifest file
manifest_file_size = os.path.getsize(final_manifest_path)
manifest_file_info = {"fileName": final_manifest_path, "size": manifest_file_size}
configs[UPLOAD_TYPE] = "metadata"
apiInvoker = APIInvoker(configs)
final_manifest_name = os.path.basename(final_manifest_path)
file_array = [{"fileName": final_manifest_name, "size": manifest_file_size}]
newBatch = None
if apiInvoker.create_batch(file_array):

newBatch = apiInvoker.new_batch
if not newBatch.get(BATCH_BUCKET) or not newBatch[FILE_PREFIX] or not newBatch.get(BATCH_ID):
print("Failed to upload files: can't create new batch! Please check log file in tmp folder for details.")
return False
configs[S3_BUCKET] = newBatch.get(BATCH_BUCKET)
configs[FILE_PREFIX] = newBatch[FILE_PREFIX]
configs[BATCH_ID] = newBatch.get(BATCH_ID)
print(f"New batch is created: {newBatch.get(BATCH_ID)} at {newBatch[BATCH_CREATED]}")
uploader = Copier(configs[S3_BUCKET], configs[FILE_PREFIX] , configs)
result = uploader.copy_file({FILE_NAME_DEFAULT: final_manifest_name, FILE_PATH: final_manifest_path, FILE_SIZE_DEFAULT: manifest_file_size}, True, False)
except Exception as e:
print(f"Failed to add file id to the pre-manifest, {file_path}. Error: {e}")
finally:
if not newBatch or not result:
print(f"Failed process the manifest, {final_manifest_path}.")
return False
else:
# update batch
status = result.get(BATCH_STATUS, False)
errors = [f"Failed to upload manifest file,{final_manifest_name}"] if not status else []
manifest_file_info = {"fileName": final_manifest_name, "succeeded": status, "errors": errors, "skipped": False}
if not apiInvoker.update_batch(newBatch[BATCH_ID], [manifest_file_info]):
print(f"Failed to update batch, {newBatch[BATCH_ID]}!")
return False
print(f"Successfully process the manifest, {final_manifest_path}.")
return True

# This method will create a new manifest file with the file id column added to the pre-manifest.
def add_file_id(file_id_name, final_manifest_path, file_infos, manifest_rows, manifest_columns):
output = []
for file in file_infos:
row = [row for row in manifest_rows if row[FILE_NAME_DEFAULT] == file["fileName"]][0]
row[file_id_name] = file[FILE_ID_DEFAULT]
output.append(row.values())
with open(final_manifest_path, 'w') as f:
writer = csv.writer(f, delimiter='\t')
writer.writerow(manifest_columns)
writer.writerows(output)

return True
14 changes: 13 additions & 1 deletion src/upload_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from common.constants import UPLOAD_TYPE, UPLOAD_TYPES, FILE_NAME_DEFAULT, FILE_SIZE_DEFAULT, MD5_DEFAULT, \
API_URL, TOKEN, SUBMISSION_ID, FILE_DIR, FILE_MD5_FIELD, PRE_MANIFEST, FILE_NAME_FIELD, FILE_SIZE_FIELD, RETRIES, OVERWRITE, \
DRY_RUN, TYPE_FILE
DRY_RUN, TYPE_FILE, FILE_ID_FIELD, OMIT_DCF_PREFIX
from bento.common.utils import get_logger
from common.utils import clean_up_key_value

Expand All @@ -24,6 +24,9 @@ def __init__(self):
parser.add_argument('-n', '--name-field', help='header file name in manifest, optional, default value is "file_name"')
parser.add_argument('-s', '--size-field', help='header file size in manifest, optional, default value is "file_size"')
parser.add_argument('-m', '--md5-field', help='header md5 name in manifest, optional, default value is "md5sum"')
parser.add_argument('-i', '--id-field', help='header file ID name in manifest, optional, default value is "file_id"')
parser.add_argument('-o', '--omit-DCF-prefix', help='boolean to define if need DCF prefix "dg.4DFC"')

parser.add_argument('-r', '--retries', default=3, type=int, help='file uploading retries, optional, default value is 3')

#for better user experience, using configuration file to pass all args above
Expand Down Expand Up @@ -125,6 +128,15 @@ def validate(self):
md5_header = self.data.get(FILE_MD5_FIELD)
if md5_header is None:
self.data[FILE_MD5_FIELD] = MD5_DEFAULT

file_id_header= self.data.get(FILE_ID_FIELD)
if file_id_header is None:
self.log.critical(f'file id field is required.')
return False

omit_dcf_prefix = self.data.get(OMIT_DCF_PREFIX)
if omit_dcf_prefix is None:
self.data[OMIT_DCF_PREFIX] = False

filepath = self.data.get(FILE_DIR)
if filepath is None:
Expand Down
Loading

0 comments on commit f121de8

Please sign in to comment.