Skip to content

Commit

Permalink
Addressing Codacy Issues
Browse files Browse the repository at this point in the history
  • Loading branch information
AustinSMueller committed Oct 22, 2021
1 parent d1a1eb5 commit 785f3ee
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 243 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ This is the documentation index for the NCI ICDC/CTDC Data Loader
## Module List
The NCI ICDC/CTDC Data Loader includes multiple data loading modules:

* **Data Loader**
* The Data Loader module is a versatile Python application used to load data into a Neo4j database.
* [Data Loader Documentation](docs/data-loader.md)
* **File Copier**
* The File Copier module copies files from a source URL to a designated AWS S3 Bucket.
* [File Copier Documentation](docs/file-copier.md)
- **Data Loader**
- The Data Loader module is a versatile Python application used to load data into a Neo4j database.
- [Data Loader Documentation](docs/data-loader.md)

- **File Copier**
- The File Copier module copies files from a source URL to a designated AWS S3 Bucket.
- [File Copier Documentation](docs/file-copier.md)

* **File Loader**
* The File Loader module processes incoming S3 files and then calls the Data Loader module to load the processed file data into a Neo4j database.
* [File Loader Documentation](docs/file-loader.md)
- **File Loader**
- The File Loader module processes incoming S3 files and then calls the Data Loader module to load the processed file data into a Neo4j database.
- [File Loader Documentation](docs/file-loader.md)

* **Model Converter**
* The Model Converter uses a combination of YAML format schema files, a YAML formatted properties files, and a GraphQL formatted queries file to generate a GraphQL formatted schema.
* [Model Converter Documentation](docs/model-converter.md)
- **Model Converter**
- The Model Converter uses a combination of YAML format schema files, a YAML formatted properties files, and a GraphQL formatted queries file to generate a GraphQL formatted schema.
- [Model Converter Documentation](docs/model-converter.md)
1 change: 0 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from configparser import ConfigParser
import os
import yaml

Expand Down
110 changes: 55 additions & 55 deletions copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,37 @@
from bento.common.s3 import S3Bucket


def _is_valid_url(org_url):
return re.search(r'^[^:/]+://', org_url)


def _is_local(org_url):
return org_url.startswith('file://')


def _get_local_path(org_url):
if _is_local(org_url):
return org_url.replace('file://', '')
else:
raise ValueError(f'{org_url} is not a local file!')


def _get_org_md5(org_url, local_file):
"""
Get original MD5, if adapter can't get it, calculate it from original file, download if necessary
:param org_url:
:return:
"""
if _is_local(org_url):
file_path = _get_local_path(org_url)
return get_md5(file_path)
else:
# Download to local and calculate MD5
stream_download(org_url, local_file)
if not os.path.isfile(local_file):
raise Exception(f'Download file {org_url} to local failed!')
return get_md5(local_file)


class Copier:
adapter_attrs = ['load_file_info', 'clear_file_info', 'get_org_url', 'get_file_name', 'get_org_md5',
Expand Down Expand Up @@ -47,7 +78,7 @@ def __init__(self, bucket_name, prefix, adapter):
# Verify adapter has all functions needed
for attr in self.adapter_attrs:
if not hasattr(adapter, attr):
raise TypeError(f'Adapter doesn\'t have "{attr}" attribute/method')
raise TypeError(f'Adapter does not have "{attr}" attribute/method')
self.adapter = adapter

self.log = get_logger('Copier')
Expand All @@ -71,15 +102,15 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False):
:param file_info: dict that has file information
:param overwrite: overwrite file in S3 bucket even existing file has same size
:param dryrun: only do preliminary check, don't copy file
:param verify_md5: verify file size and MD5 in file_info against orginal file
:param verify_md5: verify file size and MD5 in file_info against original file
:return: dict
"""
local_file = None
try:
self.adapter.clear_file_info()
self.adapter.load_file_info(file_info)
org_url = self.adapter.get_org_url()
if not self._is_valid_url(org_url):
if not _is_valid_url(org_url):
self.log.error(f'"{org_url}" is not a valid URL!')
return {self.STATUS: False}
if not self._file_exists(org_url):
Expand All @@ -99,11 +130,11 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False):
if not org_md5:
self.log.info(f'Original MD5 not available, calculate MD5 locally...')
local_file = f'tmp/{file_name}'
org_md5 = self._get_org_md5(org_url, local_file)
org_md5 = _get_org_md5(org_url, local_file)
elif verify_md5:
self.log.info(f'Downloading file and verifying MD5 locally...')
local_file = f'tmp/{file_name}'
local_md5 = self._get_org_md5(org_url, local_file)
local_md5 = _get_org_md5(org_url, local_file)
if local_md5 != org_md5:
self.log.error(f'MD5 verify failed! Original MD5: {org_md5}, local MD5: {local_md5}')
return {self.STATUS: False}
Expand All @@ -112,15 +143,13 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False):
self.log.info(f'Original MD5 {org_md5}')

succeed = {self.STATUS: True,
self.MD5: org_md5,
self.NAME: file_name,
self.KEY: key,
self.FIELDS: self.adapter.get_fields(),
self.ACL: self.adapter.get_acl(),
self.SIZE: org_size
}


self.MD5: org_md5,
self.NAME: file_name,
self.KEY: key,
self.FIELDS: self.adapter.get_fields(),
self.ACL: self.adapter.get_acl(),
self.SIZE: org_size
}

if dryrun:
self.log.info(f'Copying file {key} skipped (dry run)')
Expand All @@ -132,8 +161,8 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False):

self.log.info(f'Copying from {org_url} to s3://{self.bucket_name}/{key} ...')
# Original file is local
if self._is_local(org_url):
file_path = self._get_local_path(org_url)
if _is_local(org_url):
file_path = _get_local_path(org_url)
with open(file_path, 'rb') as stream:
dest_size = self._upload_obj(stream, key, org_size)
# Original file has been downloaded to local
Expand All @@ -160,35 +189,19 @@ def copy_file(self, file_info, overwrite, dryrun, verify_md5=False):
os.remove(local_file)

def _upload_obj(self, stream, key, org_size):
parts = org_size // self.MULTI_PART_CHUNK_SIZE
chunk_size = self.MULTI_PART_CHUNK_SIZE if parts < self.PARTS_LIMIT else org_size // self.PARTS_LIMIT
parts = org_size // self.MULTI_PART_CHUNK_SIZE
chunk_size = self.MULTI_PART_CHUNK_SIZE if parts < self.PARTS_LIMIT else org_size // self.PARTS_LIMIT

t_config = TransferConfig(multipart_threshold=self.MULTI_PART_THRESHOLD,
multipart_chunksize=chunk_size)
self.bucket._upload_file_obj(key, stream, t_config)
self.files_copied += 1
self.log.info(f'Copying file {key} SUCCEEDED!')
return self.bucket.get_object_size(key)

def _get_org_md5(self, org_url, local_file):
"""
Get original MD5, if adapter can't get it, calculate it from original file, download if necessary
:param org_url:
:return:
"""
if self._is_local(org_url):
file_path = self._get_local_path(org_url)
return get_md5(file_path)
else:
# Download to local and calculate MD5
stream_download(org_url, local_file)
if not os.path.isfile(local_file):
raise Exception(f'Download file {org_url} to local failed!')
return get_md5(local_file)
t_config = TransferConfig(multipart_threshold=self.MULTI_PART_THRESHOLD,
multipart_chunksize=chunk_size)
self.bucket.upload_file_obj(key, stream, t_config)
self.files_copied += 1
self.log.info(f'Copying file {key} SUCCEEDED!')
return self.bucket.get_object_size(key)

def _file_exists(self, org_url):
if self._is_local(org_url):
file_path = self._get_local_path(org_url)
if _is_local(org_url):
file_path = _get_local_path(org_url)
if not os.path.isfile(file_path):
self.log.error(f'"{file_path}" is not a file!')
return False
Expand All @@ -204,16 +217,3 @@ def _file_exists(self, org_url):
else:
self.log.error(f'Head file error - {r.status_code}: {org_url}')
return False

def _is_local(self, org_url):
return org_url.startswith('file://')

def _is_valid_url(self, org_url):
return re.search(r'^[^:/]+://', org_url)

def _get_local_path(self, org_url):
if self._is_local(org_url):
return org_url.replace('file://', '')
else:
raise ValueError(f'{org_url} is not a local file!')

Loading

0 comments on commit 785f3ee

Please sign in to comment.