Skip to content

Commit

Permalink
Updated Bento-common and use BentoConfig object for. configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
n2iw committed Feb 5, 2020
1 parent 3cb7fce commit 451c368
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 49 deletions.
47 changes: 24 additions & 23 deletions file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

from bento.common.utils import UUID, NODES_CREATED, RELATIONSHIP_CREATED, removeTrailingSlash,\
get_logger, UPSERT_MODE, send_slack_message
from bento.common.config import INDEXD_GUID_PREFIX, INDEXD_MANIFEST_EXT, VISIBILITY_TIMEOUT, \
TEMP_FOLDER, PSWD_ENV, SLACK_URL
from bento.common.config import BentoConfig
from bento.common.props import Props
from bento.common.sqs import Queue, VisibilityExtender
from bento.common.data_loader import DataLoader
from bento.common.icdc_schema import ICDC_Schema, get_uuid_for_node
from bento.common.icdc_schema import ICDC_Schema

RAW_PREFIX = 'RAW'
FINAL_PREFIX = 'Final'
Expand Down Expand Up @@ -58,7 +57,10 @@


class FileLoader:
def __init__(self, queue_name, driver, schema, manifest_bucket, manifest_folder, dry_run=False):
def __init__(self, queue_name, driver, schema, config, manifest_bucket, manifest_folder, dry_run=False):
assert isinstance(config, BentoConfig)
self.config = config

self.log = get_logger('File Loader')
self.queue_name = queue_name
self.s3_client = boto3.client('s3')
Expand Down Expand Up @@ -227,23 +229,21 @@ def send_sqs_message(self, queue, data_bucket, data_path):
def get_s3_location(bucket, folder, key):
return "s3://{}/{}/{}".format(bucket, folder, key)

@staticmethod
def populate_indexd_record(record, file_info):
record[GUID] = '{}{}'.format(INDEXD_GUID_PREFIX, get_uuid_for_node("file", file_info[SHA512]))
def populate_indexd_record(self, record, file_info):
record[GUID] = '{}{}'.format(self.config.INDEXD_GUID_PREFIX, self.schema.get_uuid_for_node("file", file_info[SHA512]))
record[MD5] = file_info[MD5_SUM]
record[SIZE] = file_info[FILE_SIZE]
record[ACL] = DEFAULT_ACL
record[URL] = file_info[FILE_LOC]
return record

@staticmethod
def populate_record(record, file_info):
def populate_record(self, record, file_info):
file_name = file_info[FILE_NAME]
record[FILE_SIZE] = file_info[FILE_SIZE]
record[FILE_LOC] = file_info[FILE_LOC]
record[MD5_SUM] = file_info[MD5_SUM]
record[FILE_FORMAT] = (os.path.splitext(file_name)[1]).split('.')[1].lower()
record[UUID] = get_uuid_for_node("file", file_info[SHA512])
record[UUID] = self.schema.get_uuid_for_node("file", file_info[SHA512])
record[FILE_STAT] = DEFAULT_STAT
record[ACL] = DEFAULT_ACL
return record
Expand Down Expand Up @@ -300,12 +300,11 @@ def populate_manifest(self, manifest, neo4j_file, indexd_file, extracted_files):
os.remove(indexd_file)
return succeeded

@staticmethod
def get_indexd_manifest_name(file_name):
def get_indexd_manifest_name(self, file_name):
folder = os.path.dirname(file_name)
base_name = os.path.basename(file_name)
name, _ = os.path.splitext(base_name)
new_name = '{}_indexd{}'.format(name, INDEXD_MANIFEST_EXT)
new_name = '{}_indexd{}'.format(name, self.config.INDEXD_MANIFEST_EXT)
return os.path.join(folder, new_name)

@staticmethod
Expand Down Expand Up @@ -346,7 +345,7 @@ def handler(self, event):
for record in event['Records']:
start = timer()
end = start
temp_folder = os.path.join(TEMP_FOLDER, str(uuid.uuid4()))
temp_folder = os.path.join(self.config.TEMP_FOLDER, str(uuid.uuid4()))
try:
os.makedirs(temp_folder)
bucket = record['s3']['bucket']['name']
Expand Down Expand Up @@ -413,7 +412,7 @@ def send_success_email(self, file_name, final_path, file_list, manifests, loadin
content += '*Running time: {:.2f} seconds*\n'.format(running_time)

self.log.info('Sending success message to Slack ...')
send_slack_message(SLACK_URL, {"text": content}, self.log)
send_slack_message(self.config.SLACK_URL, {"text": content}, self.log)

self.log.info('Success message sent')

Expand All @@ -430,12 +429,12 @@ def listen(self):
self.log.info('PIMixture Processor service started!')
while True:
self.log.info("Receiving more messages...")
for msg in self.queue.receiveMsgs(VISIBILITY_TIMEOUT):
for msg in self.queue.receiveMsgs(self.config.VISIBILITY_TIMEOUT):
extender = None
try:
data = json.loads(msg.body)
if data and RECORDS in data and isinstance(data[RECORDS], list):
extender = VisibilityExtender(msg, VISIBILITY_TIMEOUT)
extender = VisibilityExtender(msg, self.config.VISIBILITY_TIMEOUT)
self.log.info('Start processing job ...')

if self.handler(data):
Expand Down Expand Up @@ -476,6 +475,7 @@ def load_manifests(self, manifests):

def main(args):
log = get_logger('Raw file processor - main')
config = BentoConfig(args.config_file)

if not args.queue:
log.error('Please specify queue name with -q/--queue argument')
Expand All @@ -486,12 +486,12 @@ def main(args):

password = args.password
if not password:
if PSWD_ENV not in os.environ:
if config.PSWD_ENV not in os.environ:
log.error(
'Password not specified! Please specify password with -p or --password argument, or set {} env var'.format( PSWD_ENV))
'Password not specified! Please specify password with -p or --password argument, or set {} env var'.format( config.PSWD_ENV))
sys.exit(1)
else:
password = os.environ[PSWD_ENV]
password = os.environ[config.PSWD_ENV]
user = args.user if args.user else 'neo4j'

if not args.schema:
Expand All @@ -516,7 +516,7 @@ def main(args):
props = Props(args.prop_file)
schema = ICDC_Schema(args.schema, props)
driver = neo4j.GraphDatabase.driver(uri, auth=(user, password))
processor = FileLoader(args.queue, driver, schema, args.bucket, args.s3_folder, args.dry_run)
processor = FileLoader(args.queue, driver, schema, config, args.bucket, args.s3_folder, args.dry_run)
processor.listen()

except neo4j.ServiceUnavailable as err:
Expand All @@ -539,11 +539,12 @@ def main(args):
parser.add_argument('-u', '--user', help='Neo4j user')
parser.add_argument('-p', '--password', help='Neo4j password')
parser.add_argument('-s', '--schema', help='Schema files', action='append')
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml')
parser.add_argument('--config-file', help='Configuration file, example is in config/config.example.ini')
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml', required=True)
parser.add_argument('--config-file', help='Configuration file, example is in config/config.example.ini', required=True)
parser.add_argument('-d', '--dry-run', help='Validations only, skip loading', action='store_true')
parser.add_argument('-m', '--max-violations', help='Max violations to display', nargs='?', type=int, default=10)
parser.add_argument('-b', '--bucket', help='Output (manifest) S3 bucket name')
parser.add_argument('-f', '--s3-folder', help='Output (manifest) S3 folder')
args = parser.parse_args()

main(args)
25 changes: 14 additions & 11 deletions loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from bento.common.props import Props
from bento.common.utils import get_logger, removeTrailingSlash, check_schema_files, DATETIME_FORMAT, get_host, \
UPSERT_MODE, NEW_MODE, DELETE_MODE
from bento.common.config import BACKUP_FOLDER, PSWD_ENV
from bento.common.config import BentoConfig
from bento.common.data_loader import DataLoader
from bento.common.s3 import S3Bucket

Expand All @@ -21,8 +21,9 @@ def parse_arguments():
parser.add_argument('-u', '--user', help='Neo4j user')
parser.add_argument('-p', '--password', help='Neo4j password')
parser.add_argument('-s', '--schema', help='Schema files', action='append', required=True)
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml')
parser.add_argument('--config-file', help='Configuration file, example is in config/config.example.ini')
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml', required=True)
parser.add_argument('--config-file', help='Configuration file, example is in config/config.example.ini',
required=True)
parser.add_argument('-c', '--cheat-mode', help='Skip validations, aka. Cheat Mode', action='store_true')
parser.add_argument('-d', '--dry-run', help='Validations only, skip loading', action='store_true')
parser.add_argument('--wipe-db', help='Wipe out database before loading, you\'ll lose all data!',
Expand All @@ -40,6 +41,8 @@ def parse_arguments():


def process_arguments(args, log):
config = BentoConfig(args.config_file)

directory = args.dir
if args.s3_folder:
if not os.path.exists(directory):
Expand Down Expand Up @@ -71,21 +74,22 @@ def process_arguments(args, log):

password = args.password
if not password:
if PSWD_ENV not in os.environ:
if config.PSWD_ENV not in os.environ:
log.error('Password not specified! Please specify password with -p or --password argument,' +
' or set {} env var'.format(PSWD_ENV))
' or set {} env var'.format(config.PSWD_ENV))
sys.exit(1)
else:
password = os.environ[PSWD_ENV]
password = os.environ[config.PSWD_ENV]
user = args.user if args.user else 'neo4j'
return (user, password, directory, uri)

return (user, password, directory, uri, config)


def backup_neo4j(backup_dir, name, address, log):
try:
restore_cmd = 'To restore DB from backup (to remove any changes caused by current data loading, run following commands:\n'
restore_cmd += '#' * 160 + '\n'
neo4j_cmd = 'neo4j-admin restore --from={}/{} --force'.format(BACKUP_FOLDER, name)
neo4j_cmd = 'neo4j-admin restore --from={}/{} --force'.format(backup_dir, name)
cmds = [
[
'mkdir',
Expand Down Expand Up @@ -123,8 +127,7 @@ def backup_neo4j(backup_dir, name, address, log):
def main():
log = get_logger('Loader')
args = parse_arguments()
user, password, directory, uri = process_arguments(args, log)

user, password, directory, uri, config = process_arguments(args, log)

if not check_schema_files(args.schema, log):
sys.exit(1)
Expand All @@ -143,7 +146,7 @@ def main():
host = get_host(uri)
restore_cmd = ''
if not args.no_backup and not args.dry_run:
restore_cmd = backup_neo4j(BACKUP_FOLDER, backup_name, host, log)
restore_cmd = backup_neo4j(config.BACKUP_FOLDER, backup_name, host, log)
if not restore_cmd:
log.error('Backup Neo4j failed, abort loading!')
sys.exit(1)
Expand Down
3 changes: 1 addition & 2 deletions model-converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
log = get_logger('Model Converter')
parser = argparse.ArgumentParser(description='Convert ICDC YAML schema to GraphQL schema')
parser.add_argument('-s', '--schema', help='Schema files', action='append')
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml')
parser.add_argument('--config-file', help='Configuration file, example is in config/config.example.ini')
parser.add_argument('--prop-file', help='Property file, example is in config/props.example.yml', required=True)
parser.add_argument('query_file', help='Custom query file', type=argparse.FileType('r'))

parser.add_argument('graphql', help='Output GraphQL schema file name')
Expand Down
4 changes: 3 additions & 1 deletion tests/test_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from file_loader import FileLoader
from bento.common.icdc_schema import ICDC_Schema
from bento.common.props import Props
from bento.common.config import BentoConfig
from bento.common.data_loader import DataLoader


Expand All @@ -19,7 +20,8 @@ def setUp(self):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
props = Props('../config/props.yml')
self.schema = ICDC_Schema(['data/icdc-model.yml', 'data/icdc-model-props.yml'], props)
self.processor = FileLoader('', self.driver, self.schema, 'ming-icdc-file-loader', 'Final/Data_loader/Manifests')
config = BentoConfig('../config/config.ini')
self.processor = FileLoader('', self.driver, self.schema, config, 'ming-icdc-file-loader', 'Final/Data_loader/Manifests')
self.loader = DataLoader(self.driver, self.schema)
self.file_list = [
"data/Dataset/COP-program.txt",
Expand Down
2 changes: 0 additions & 2 deletions tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ def test_remove_traling_slash(self):
def test_loader_construction(self):
self.assertRaises(Exception, DataLoader, None, None, None)
self.assertRaises(Exception, DataLoader, self.driver, None, None)
self.assertRaises(Exception, DataLoader, self.driver, self.schema , None)
self.assertRaises(Exception, DataLoader, self.driver, self.schema , ['a', 'b'])
self.assertIsInstance(self.loader, DataLoader)

def test_validate_parents_exist_in_file(self):
Expand Down
8 changes: 0 additions & 8 deletions tests/test_utils.py

This file was deleted.

1 change: 0 additions & 1 deletion unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
CODE_FOLDER=$PWD
cd tests
export PYTHONPATH=$PYTHONPATH:$CODE_FOLDER
export ICDC_DATA_LOADER_CONFIG=$CODE_FOLDER/config/config.ini
python3 -m unittest

0 comments on commit 451c368

Please sign in to comment.