From 38b890ac9189ae0f4249c92714adb8c0a54bab0e Mon Sep 17 00:00:00 2001 From: Uri Laserson Date: Mon, 17 Aug 2015 17:11:32 -0700 Subject: [PATCH] Reorg, incl splitting up util and adding ADAM variant conversion Also moved old code to a temporary DELETE_ME dir --- {eggo => DELETE_ME}/dag.py | 0 {eggo => DELETE_ME}/fabric_cli.py | 0 {eggo => DELETE_ME}/spark_ec2.py | 0 datasets/dbsnp/toast.py | 4 +- eggo/cluster/cli.py | 15 +- eggo/cluster/director.py | 255 +++--------------- .../{config.py => resources/__init__.py} | 22 -- eggo/cluster/{ => resources}/aws.conf | 0 .../{ => resources}/cloudformation.template | 0 eggo/config.py | 98 +------ eggo/datasets/__init__.py | 37 +++ eggo/util.py | 204 +++++++++++++- 12 files changed, 300 insertions(+), 335 deletions(-) rename {eggo => DELETE_ME}/dag.py (100%) rename {eggo => DELETE_ME}/fabric_cli.py (100%) rename {eggo => DELETE_ME}/spark_ec2.py (100%) rename eggo/cluster/{config.py => resources/__init__.py} (54%) rename eggo/cluster/{ => resources}/aws.conf (100%) rename eggo/cluster/{ => resources}/cloudformation.template (100%) diff --git a/eggo/dag.py b/DELETE_ME/dag.py similarity index 100% rename from eggo/dag.py rename to DELETE_ME/dag.py diff --git a/eggo/fabric_cli.py b/DELETE_ME/fabric_cli.py similarity index 100% rename from eggo/fabric_cli.py rename to DELETE_ME/fabric_cli.py diff --git a/eggo/spark_ec2.py b/DELETE_ME/spark_ec2.py similarity index 100% rename from eggo/spark_ec2.py rename to DELETE_ME/spark_ec2.py diff --git a/datasets/dbsnp/toast.py b/datasets/dbsnp/toast.py index a8f380a..d85c51f 100755 --- a/datasets/dbsnp/toast.py +++ b/datasets/dbsnp/toast.py @@ -23,9 +23,11 @@ from eggo.datasets import download_dataset_with_hadoop -hdfs_path = '/user/ec2-user/dbsnp/raw' +raw_data_path = '/user/ec2-user/dbsnp/raw' with open(pjoin(osp.dirname(__file__), 'datapackage.json')) as ip: datapackage = json.load(ip) download_dataset_with_hadoop(datapackage, hdfs_path) + +vcf_to_adam_variants(raw_data_path, adam_variants_path) diff --git a/eggo/cluster/cli.py b/eggo/cluster/cli.py index 2ae70dc..9711ba9 100644 --- a/eggo/cluster/cli.py +++ b/eggo/cluster/cli.py @@ -14,11 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os.path as osp + from click import group, option, Choice +from fabric.api import execute, get import eggo.cluster.director as director -from eggo.cluster.config import ( - DEFAULT_DIRECTOR_CONF_PATH, DEFAULT_CF_TEMPLATE_PATH) + + +DEFAULT_DIRECTOR_CONF_PATH = osp.join( + osp.dirname(__file__), 'resources', 'aws.conf') +DEFAULT_CF_TEMPLATE_PATH = osp.join( + osp.dirname(__file__), 'resources', 'cloudformation.template') # reusable options @@ -93,7 +100,7 @@ def login(region, stack_name, node): @option_stack_name def describe(region, stack_name): """Describe the EC2 instances in the cluster""" - director.list(region, stack_name) + director.describe(region, stack_name) @cli.command() @@ -109,7 +116,6 @@ def web_proxy(region, stack_name): @option_stack_name def get_director_log(region, stack_name): """DEBUG: get the Director application log from the launcher instance""" - from fabric.api import execute, get ec2_conn = director.create_ec2_connection(region) hosts = [director.get_launcher_instance(ec2_conn, stack_name).ip_address] execute( @@ -124,7 +130,6 @@ def get_director_log(region, stack_name): @option('-b', '--branch', default='master', show_default=True) def reinstall_eggo(region, stack_name, fork, branch): """DEBUG: reinstall a specific version of eggo""" - from fabric.api import execute ec2_conn = director.create_ec2_connection(region) hosts = [director.get_master_instance(ec2_conn, stack_name).ip_address] execute( diff --git a/eggo/cluster/director.py b/eggo/cluster/director.py index ce1f56e..a7c8eb5 100644 --- a/eggo/cluster/director.py +++ b/eggo/cluster/director.py @@ -14,175 +14,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -import itertools -import os -import sys -import time from getpass import getuser -from tempfile import mkdtemp from datetime import datetime from cStringIO import StringIO -from subprocess import Popen, check_call, CalledProcessError -from contextlib import contextmanager -import boto.ec2 -import boto.cloudformation from boto.ec2.networkinterface import ( NetworkInterfaceCollection, NetworkInterfaceSpecification) -from boto.exception import BotoServerError from fabric.api import ( - sudo, local, run, execute, put, open_shell, env, parallel, cd) + sudo, run, execute, put, open_shell, env, parallel, cd) from fabric.contrib.files import append, exists from cm_api.api_client import ApiResource from eggo.error import EggoError -from eggo.cluster.config import (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, - EC2_KEY_PAIR, EC2_PRIVATE_KEY_FILE) +from eggo.config import ( + get_aws_access_key_id, get_aws_secret_access_key, get_ec2_key_pair, + get_ec2_private_key_file) +from eggo.util import ( + create_cf_connection, create_cf_stack, get_subnet_id, delete_stack, + get_security_group_id, create_ec2_connection, get_tagged_instances, + wait_for_instance_state, get_launcher_instance, get_manager_instance, + get_master_instance, get_worker_instances, non_blocking_tunnel, + http_tunnel_ctx) env.user = 'ec2-user' -env.key_filename = EC2_PRIVATE_KEY_FILE - - -def _sleep(start_time): - elapsed = (datetime.now() - start_time).seconds - if elapsed < 30: - time.sleep(5) - elif elapsed < 60: - time.sleep(10) - elif elapsed < 200: - time.sleep(20) - else: - time.sleep(elapsed / 10.) - - -# CLOUDFORMATION UTIL - -def wait_for_stack_status(cf_conn, stack_name, stack_status): - sys.stdout.write( - "Waiting for stack to enter '{s}' state.".format(s=stack_status)) - sys.stdout.flush() - start_time = datetime.now() - num_attempts = 0 - while True: - _sleep(start_time) - stack = cf_conn.describe_stacks(stack_name)[0] - if stack.stack_status == stack_status: - break - num_attempts += 1 - sys.stdout.write(".") - sys.stdout.flush() - sys.stdout.write("\n") - end_time = datetime.now() - print "Stack is now in '{s}' state. Waited {t} seconds.".format( - s=stack_status, t=(end_time - start_time).seconds) - - -def create_cf_connection(region): - return boto.cloudformation.connect_to_region(region) - - -def create_cf_stack(cf_conn, stack_name, cf_template_path, availability_zone): - try: - if len(cf_conn.describe_stacks(stack_name)) > 0: - print "Stack '{n}' already exists. Reusing.".format(n=stack_name) - return - except BotoServerError: - # stack does not exist - pass - - print "Creating stack with name '{n}'.".format(n=stack_name) - with open(cf_template_path, 'r') as template_file: - template_body=template_file.read() - cf_conn.create_stack(stack_name, template_body=template_body, - parameters=[('KeyPairName', EC2_KEY_PAIR), - ('AZ', availability_zone)], - tags={'owner': getuser(), - 'ec2_key_pair': EC2_KEY_PAIR}) - wait_for_stack_status(cf_conn, stack_name, 'CREATE_COMPLETE') - - -def get_stack_resource_id(cf_conn, stack_name, logical_resource_id): - for resource in cf_conn.describe_stack_resources(stack_name): - if resource.logical_resource_id == logical_resource_id: - return resource.physical_resource_id - return None - - -def get_subnet_id(cf_conn, stack_name): - return get_stack_resource_id(cf_conn, stack_name, 'DMZSubnet') - - -def get_security_group_id(cf_conn, stack_name): - return get_stack_resource_id(cf_conn, stack_name, 'ClusterSG') - - -def delete_stack(cf_conn, stack_name): - print "Deleting stack with name '{n}'.".format(n=stack_name) - cf_conn.delete_stack(stack_name) - wait_for_stack_status(cf_conn, stack_name, 'DELETE_COMPLETE') - - -# EC2 UTIL - -def create_ec2_connection(region): - return boto.ec2.connect_to_region(region) - - -def get_tagged_instances(ec2_conn, tags): - filters = [('tag:' + k, v) for (k, v) in tags.iteritems()] - instances = ec2_conn.get_only_instances(filters=filters) - return [i for i in instances - if i.state not in ["shutting-down", "terminated"]] - - -def get_launcher_instance(ec2_conn, stack_name): - return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, - 'eggo_node_type': 'launcher'})[0] - - -def get_manager_instance(ec2_conn, stack_name): - return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, - 'eggo_node_type': 'manager'})[0] - - -def get_worker_instances(ec2_conn, stack_name): - return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, - 'eggo_node_type': 'worker'}) - - -def get_master_instance(ec2_conn, stack_name): - return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, - 'eggo_node_type': 'master'})[0] - - -def wait_for_instance_state(ec2_conn, instance, state='running'): - sys.stdout.write( - "Waiting for instance to enter '{s}' state.".format(s=state)) - sys.stdout.flush() - start_time = datetime.now() - num_attempts = 0 - while True: - _sleep(start_time) - instance.update() - statuses = ec2_conn.get_all_instance_status(instance.id) - if len(statuses) > 0: - status = statuses[0] - if (instance.state == state and - status.system_status.status == 'ok' and - status.instance_status.status == 'ok'): - break - num_attempts += 1 - sys.stdout.write(".") - sys.stdout.flush() - sys.stdout.write("\n") - end_time = datetime.now() - print "Instance is now in '{s}' state. Waited {t} seconds.".format( - s=state, t=(end_time - start_time).seconds) +env.key_filename = get_ec2_private_key_file() def install_private_key(): - put(EC2_PRIVATE_KEY_FILE, 'id.pem') + put(get_ec2_private_key_file(), 'id.pem') run('chmod 600 id.pem') @@ -194,9 +54,9 @@ def install_director_client(): def create_launcher_instance(ec2_conn, cf_conn, stack_name, launcher_ami, launcher_instance_type): - launcher_instances = get_tagged_instances(ec2_conn, - {'eggo_stack_name': stack_name, - 'eggo_node_type': 'launcher'}) + launcher_instances = get_tagged_instances( + ec2_conn, {'eggo_stack_name': stack_name, + 'eggo_node_type': 'launcher'}) if len(launcher_instances) > 0: print "Launcher instance ({instance}) already exists. Reusing.".format( instance=launcher_instances[0].ip_address) @@ -211,13 +71,13 @@ def create_launcher_instance(ec2_conn, cf_conn, stack_name, launcher_ami, interfaces = NetworkInterfaceCollection(interface) reservation = ec2_conn.run_instances( launcher_ami, - key_name=EC2_KEY_PAIR, + key_name=get_ec2_key_pair(), instance_type=launcher_instance_type, network_interfaces=interfaces) launcher_instance = reservation.instances[0] launcher_instance.add_tag('owner', getuser()) - launcher_instance.add_tag('ec2_key_pair', EC2_KEY_PAIR) + launcher_instance.add_tag('ec2_key_pair', get_ec2_key_pair()) launcher_instance.add_tag('eggo_stack_name', stack_name) launcher_instance.add_tag('eggo_node_type', 'launcher') wait_for_instance_state(ec2_conn, launcher_instance) @@ -230,19 +90,19 @@ def run_director_bootstrap(director_conf_path, region, cluster_ami, num_workers, stack_name): # replace variables in conf template and copy to launcher cf_conn = create_cf_connection(region) - params = {'accessKeyId': AWS_ACCESS_KEY_ID, - 'secretAccessKey': AWS_SECRET_ACCESS_KEY, + params = {'accessKeyId': get_aws_access_key_id(), + 'secretAccessKey': get_aws_secret_access_key(), 'region': region, 'stack_name': stack_name, 'owner': getuser(), - 'keyName': EC2_KEY_PAIR, + 'keyName': get_ec2_key_pair(), 'subnetId': get_subnet_id(cf_conn, stack_name), 'securityGroupsIds': get_security_group_id(cf_conn, stack_name), 'image': cluster_ami, 'num_workers': num_workers} with open(director_conf_path, 'r') as template_file: - interpolated_body = template_file.read() % params - director_conf = StringIO(interpolated_body) + interpolated_body = template_file.read() % params + director_conf = StringIO(interpolated_body) put(director_conf, 'director.conf') # bootstrap the Hadoop cluster run('cloudera-director bootstrap director.conf') @@ -274,7 +134,7 @@ def provision(region, availability_zone, stack_name, cf_template_path, t=(end_time - start_time).seconds / 60) -def list(region, stack_name): +def describe(region, stack_name): ec2_conn = create_ec2_connection(region) print 'Launcher', get_launcher_instance(ec2_conn, stack_name).ip_address print 'Manager', get_manager_instance(ec2_conn, stack_name).ip_address @@ -284,6 +144,7 @@ def list(region, stack_name): def login(region, stack_name, node): + print('Logging into the {0} node...'.format(node)) ec2_conn = create_ec2_connection(region) if node == 'master': hosts = [get_master_instance(ec2_conn, stack_name).ip_address] @@ -296,34 +157,18 @@ def login(region, stack_name, node): execute(open_shell, hosts=hosts) -def start_web_proxy(public_ip, private_ip, port): - p = Popen('ssh -nNT -i {private_key} -o UserKnownHostsFile=/dev/null ' - '-o StrictHostKeyChecking=no -L {port}:{private_ip}:{port} ' - 'ec2-user@{public_ip}'.format( - private_key=EC2_PRIVATE_KEY_FILE, port=port, - private_ip=private_ip, public_ip=public_ip), - shell=True) - return p - - def web_proxy(region, stack_name): ec2_conn = create_ec2_connection(region) manager_instance = get_manager_instance(ec2_conn, stack_name) master_instance = get_master_instance(ec2_conn, stack_name) - # create (public_ip, private_ip, port) triples to proxy to - cm = (manager_instance.ip_address, - manager_instance.private_ip_address, - 7180) - rm = (master_instance.ip_address, - master_instance.private_ip_address, - 8088) - hs = (master_instance.ip_address, - master_instance.private_ip_address, - 19888) + # create (instance, port) triples to proxy to + cm = (manager_instance, 7180) + rm = (master_instance, 8088) + hs = (master_instance, 19888) targets = [cm, rm, hs] tunnels = [] for target in targets: - tunnels.append(start_web_proxy(*target)) + tunnels.append(non_blocking_tunnel(*target)) try: tunnels[-1].wait() finally: @@ -355,33 +200,6 @@ def teardown(region, stack_name): delete_stack(cf_conn, stack_name) -@contextmanager -def tunnel(instance, local_port, remote_port): - tunnel_cmd = ('ssh -nNT -i {key} -o UserKnownHostsFile=/dev/null ' - '-o StrictHostKeyChecking=no -L {local}:{private}:{remote} ' - 'ec2-user@{public}'.format( - key=EC2_PRIVATE_KEY_FILE, public=instance.ip_address, - private=instance.private_ip_address, local=local_port, - remote=remote_port)) - p = Popen(tunnel_cmd, shell=True) - # ssh take a bit to open up the connection, so we wait until we get a - # successful curl command to the local port - print('Attempting to connect through SSH tunnel; may take a few attempts') - start_time = datetime.now() - while True: - try: - check_call('curl http://localhost:{0}'.format(local_port), - shell=True) - # only reach this point if the curl cmd succeeded. - break - except CalledProcessError: - _sleep(start_time) - try: - yield - finally: - p.terminate() - - def install_java_8(region, stack_name): # following general protocol for upgrading to JDK 1.8 here: # http://www.cloudera.com/content/cloudera/en/documentation/core/v5-3-x/topics/cdh_cm_upgrading_to_jdk8.html @@ -397,7 +215,7 @@ def install_java_8(region, stack_name): server_port=64999, version=9) cloudera_manager = cm_api.get_cloudera_manager() - with tunnel(manager_instance, 64999, 7180): + with http_tunnel_ctx(manager_instance, 7180, 64999): # Stop Cloudera Management Service print "Stopping Cloudera Management Service" mgmt_service = cloudera_manager.get_service() @@ -446,7 +264,7 @@ def start_cm_agents(): sudo('service cloudera-scm-agent start') execute(start_cm_agents, hosts=cluster_hosts) - with tunnel(manager_instance, 64999, 7180): + with http_tunnel_ctx(manager_instance, 7180, 64999): # Start the cluster and the mgmt service print "Starting the cluster" cluster.start().wait() @@ -467,6 +285,8 @@ def install_dev_tools(): sudo('yum install -y cmake xz-devel ncurses ncurses-devel') sudo('yum install -y zlib zlib-devel snappy snappy-devel') sudo('yum install -y python-devel') + sudo('curl https://bootstrap.pypa.io/get-pip.py | python') + sudo('pip install -U pip setuptools') def install_git(): @@ -565,13 +385,22 @@ def install_eggo(fork='bigdatagenomics', branch='master', reinstall=False): sudo('python setup.py install') +def install_env_vars(): + # NOTE: this sets cluster env vars to the PRIVATE IP addresses + manager_host = get_manager_instance(ec2_conn, stack_name).private_ip_address + append('/home/ec2-user/.bash_profile', + 'export MANAGER_HOST={0}'.format(manager_host)) + + def config_cluster(region, stack_name): start_time = datetime.now() ec2_conn = create_ec2_connection(region) master_host = get_master_instance(ec2_conn, stack_name).ip_address + execute(install_private_key, hosts=[master_host]) execute(create_hdfs_home, hosts=[master_host]) + execute(install_env_vars, hosts=[master_host]) install_java_8(region, stack_name) execute(install_dev_tools, hosts=[master_host]) execute(install_git, hosts=[master_host]) diff --git a/eggo/cluster/config.py b/eggo/cluster/resources/__init__.py similarity index 54% rename from eggo/cluster/config.py rename to eggo/cluster/resources/__init__.py index 432cb66..42131ee 100644 --- a/eggo/cluster/config.py +++ b/eggo/cluster/resources/__init__.py @@ -13,25 +13,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os -import os.path as osp - -from eggo.error import ConfigError - - -def get_env_var(name): - if name not in os.environ: - raise ConfigError('{0} environment variable is not set'.format(name)) - return os.environ[name] - - -AWS_ACCESS_KEY_ID = get_env_var('AWS_ACCESS_KEY_ID') -AWS_SECRET_ACCESS_KEY = get_env_var('AWS_SECRET_ACCESS_KEY') -EC2_KEY_PAIR = get_env_var('EC2_KEY_PAIR') -EC2_PRIVATE_KEY_FILE = get_env_var('EC2_PRIVATE_KEY_FILE') - - -DEFAULT_DIRECTOR_CONF_PATH = osp.join(osp.dirname(__file__), 'aws.conf') -DEFAULT_CF_TEMPLATE_PATH = osp.join(osp.dirname(__file__), - 'cloudformation.template') diff --git a/eggo/cluster/aws.conf b/eggo/cluster/resources/aws.conf similarity index 100% rename from eggo/cluster/aws.conf rename to eggo/cluster/resources/aws.conf diff --git a/eggo/cluster/cloudformation.template b/eggo/cluster/resources/cloudformation.template similarity index 100% rename from eggo/cluster/cloudformation.template rename to eggo/cluster/resources/cloudformation.template diff --git a/eggo/config.py b/eggo/config.py index 2e0f3df..aa53ecd 100644 --- a/eggo/config.py +++ b/eggo/config.py @@ -15,101 +15,27 @@ # limitations under the License. import os -from ConfigParser import SafeConfigParser -from eggo.util import random_id from eggo.error import ConfigError -# EGGO CONFIGURATION +def _get_env_var(name): + if name not in os.environ: + raise ConfigError('{0} environment variable is not set'.format(name)) + return os.environ[name] -def _init_eggo_config(): - defaults = {} - eggo_config = SafeConfigParser(defaults=defaults, - dict_type=dict) - # Read the local eggo config file - with open(os.environ['EGGO_CONFIG'], 'r') as ip: - eggo_config.readfp(ip, os.environ['EGGO_CONFIG']) +def get_aws_access_key_id(): + return _get_env_var('AWS_ACCESS_KEY_ID') - # Generate the random identifier for this module load - eggo_config.set('execution', 'random_id', random_id()) - # Set local (client) SPARK_HOME from environment if available - if 'SPARK_HOME' in os.environ: - eggo_config.set('client_env', - 'spark_home', - os.environ['SPARK_HOME']) +def get_aws_secret_access_key(): + return _get_env_var('AWS_SECRET_ACCESS_KEY') - # Set AWS variables from environment if available - if 'AWS_ACCESS_KEY_ID' in os.environ: - eggo_config.set('aws', - 'aws_access_key_id', - os.environ['AWS_ACCESS_KEY_ID']) - if 'AWS_SECRET_ACCESS_KEY' in os.environ: - eggo_config.set('aws', - 'aws_secret_access_key', - os.environ['AWS_SECRET_ACCESS_KEY']) - if 'EC2_KEY_PAIR' in os.environ: - eggo_config.set('aws', - 'ec2_key_pair', - os.environ['EC2_KEY_PAIR']) - if 'EC2_PRIVATE_KEY_FILE' in os.environ: - eggo_config.set('aws', - 'ec2_private_key_file', - os.environ['EC2_PRIVATE_KEY_FILE']) - return eggo_config +def get_ec2_key_pair(): + return _get_env_var('EC2_KEY_PAIR') -def assert_eggo_config_complete(c): - # read the "master" config file - ref = SafeConfigParser(dict_type=dict) - ref_config_file = os.path.join(os.environ['EGGO_HOME'], - 'conf/eggo/eggo.cfg') - with open(ref_config_file, 'r') as ip: - ref.readfp(ip, ref_config_file) - - def assert_section_complete(section): - if not set(ref.options(section)) <= set(c.options(section)): - raise ConfigError('Config pointed to by EGGO_CONFIG missing ' - 'options in section {0}'.format(section)) - - assert_section_complete('dfs') - assert_section_complete('execution') - assert_section_complete('versions') - assert_section_complete('client_env') - assert_section_complete('worker_env') - exec_ctx = c.get('execution', 'context') - if ref.has_section(exec_ctx): - assert_section_complete(exec_ctx) - - -def validate_eggo_config(c): - if (c.get('dfs', 'dfs_root_url').startswith('file:') and - c.get('execution', 'context') != 'local'): - raise ConfigError('Using local fs as target is only supported with ' - 'local execution') - - -eggo_config = _init_eggo_config() -# raise a ConfigError if there is a problem: -assert_eggo_config_complete(eggo_config) -validate_eggo_config(eggo_config) - -supported_formats = ['bdg'] # # TODO: support ga4gh - - -def generate_luigi_cfg(): - cfg = ('[core]\n' - 'logging_conf_file:{eggo_home}/conf/luigi/luigi_logging.cfg\n' - '[hadoop]\n' - 'command: hadoop\n') - return cfg.format(eggo_home=eggo_config.get('worker_env', 'eggo_home')) - - -# TOAST CONFIGURATION - -def validate_toast_config(d): - """Validate a JSON config file for an eggo dataset (a "toast").""" - pass +def get_ec2_private_key_file(): + return _get_env_var('EC2_PRIVATE_KEY_FILE') diff --git a/eggo/datasets/__init__.py b/eggo/datasets/__init__.py index ee8628e..cf4cbc7 100644 --- a/eggo/datasets/__init__.py +++ b/eggo/datasets/__init__.py @@ -19,6 +19,8 @@ from os.path import join as pjoin from subprocess import check_call +from cm_api.api_client import ApiResource + from eggo.util import make_local_tmp, make_hdfs_tmp @@ -26,6 +28,17 @@ 'hadoop-streaming-2.5.0-cdh5.3.3.jar') +def get_cluster_specs(): + cm_api = ApiResource(os.environ['MANAGER_HOST'], username='admin', + password='admin', server_port=7180, version=9) + host = list(cm_api.get_all_hosts())[0] # all hosts same instance type + cluster = list(cm_api.get_all_clusters())[0] + yarn = filter(lambda x: x.type == 'YARN', + list(cluster.get_all_services()))[0] + return {'num_worker_nodes': len(yarn.get_roles_by_type('NODEMANAGER')), + 'num_cores': host.numCores, 'node_memory': host.totalPhysMemBytes} + + def download_dataset_with_hadoop(datapackage, hdfs_path): with make_local_tmp() as tmp_local_dir: with make_hdfs_tmp(permissions='777') as tmp_hdfs_dir: @@ -73,3 +86,27 @@ def download_dataset_with_hadoop(datapackage, hdfs_path): check_call( 'hadoop fs -mv "{0}/*" {1}'.format( pjoin(tmp_hdfs_dir, 'staging'), hdfs_path), shell=True) + + +def vcf_to_adam_variants(input_path, output_path): + specs = get_cluster_specs() + cmd = ('adam/bin/adam-submit --master {master} ' + '--driver-memory {driver_memory} ' + '--num-executors {num_executors} ' + '--executor-cores {executor_cores} ' + '--executor-memory {executor_memory} ' + '-- ' + 'vcf2adam -onlyvariants {input} {output}') + cores_per_executor = min(4, specs['num_cores']) + executors_per_node = specs['num_cores'] / cores_per_executor + total_executors = executors_per_node * specs['num_worker_nodes'] + memory_per_executor = int(0.8 * specs['node_memory'] / executors_per_node) + args = {'input': input_path, + 'output': output_path, + 'master': 'yarn-client', + 'driver_memory': '8g', + 'num_executors': total_executors, + 'executor_cores': cores_per_executor, + 'executor_memory': memory_per_executor} + print(cmd.format(**args)) + check_call(cmd.format(**args), shell=True) diff --git a/eggo/util.py b/eggo/util.py index 954a63b..99403af 100644 --- a/eggo/util.py +++ b/eggo/util.py @@ -16,17 +16,26 @@ import os import re +import sys +import time import random import string from os.path import join as pjoin +from getpass import getuser from uuid import uuid4 from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp from datetime import datetime -from subprocess import check_call +from subprocess import check_call, Popen, CalledProcessError from contextlib import contextmanager +import boto.ec2 +import boto.cloudformation +from boto.exception import BotoServerError + +from eggo.config import get_ec2_key_pair, get_ec2_private_key_file + def uuid(): return uuid4().hex @@ -39,9 +48,21 @@ def random_id(prefix='tmp_eggo', n=4): rand=rand_string) -def ensure_dir(path): - if not os.path.exists(path): - os.makedirs(path) +def sleep_progressive(start_time): + elapsed = (datetime.now() - start_time).seconds + if elapsed < 30: + time.sleep(5) + elif elapsed < 60: + time.sleep(10) + elif elapsed < 200: + time.sleep(20) + else: + time.sleep(elapsed / 10.) + + +# ============== +# FILE UTILITIES +# ============== def sanitize(dirty): @@ -67,8 +88,8 @@ def uri_to_sanitized_filename(source_uri, decompress=False): @contextmanager -def make_local_tmp(prefix='tmp_eggo_', dir=None): - tmpdir = mkdtemp(prefix=prefix, dir=dir) +def make_local_tmp(prefix='tmp_eggo_', dir_=None): + tmpdir = mkdtemp(prefix=prefix, dir=dir_) try: yield tmpdir finally: @@ -76,8 +97,8 @@ def make_local_tmp(prefix='tmp_eggo_', dir=None): @contextmanager -def make_hdfs_tmp(prefix='tmp_eggo', dir='/tmp', permissions='755'): - tmpdir = pjoin(dir, '_'.join([prefix, uuid()])) +def make_hdfs_tmp(prefix='tmp_eggo', dir_='/tmp', permissions='755'): + tmpdir = pjoin(dir_, '_'.join([prefix, uuid()])) check_call('hadoop fs -mkdir {0}'.format(tmpdir).split()) if permissions != '755': check_call( @@ -86,3 +107,170 @@ def make_hdfs_tmp(prefix='tmp_eggo', dir='/tmp', permissions='755'): yield tmpdir finally: check_call('hadoop fs -rm -r {0}'.format(tmpdir).split()) + + +# ============= +# AWS UTILITIES +# ============= + + +def non_blocking_tunnel(instance, remote_port, local_port=None, user='ec2-user', + private_key=None): + if local_port is None: + local_port = remote_port + if private_key is None: + private_key = get_ec2_private_key_file() + p = Popen('ssh -nNT -i {private_key} -o UserKnownHostsFile=/dev/null ' + '-o StrictHostKeyChecking=no -L {local}:{private_ip}:{remote} ' + '{user}@{public_ip}'.format( + private_key=private_key, public_ip=instance.ip_address, + private_ip=instance.private_ip_address, user=user, + local=local_port, remote=remote_port), + shell=True) + return p + + +@contextmanager +def http_tunnel_ctx(instance, remote_port, local_port=None, user='ec2-user', + private_key=None): + tunnel_process = non_blocking_tunnel(instance, remote_port, local_port, + user, private_key) + # ssh may take a bit to open up the connection, so we wait until we get a + # successful curl command to the local port + print('Attempting to curl to SSH tunnel; may take a few attempts\n') + start_time = datetime.now() + while True: + try: + check_call('curl http://localhost:{0}'.format(local_port), + shell=True) + # only reach this point if the curl cmd succeeded. + break + except CalledProcessError: + sleep_progressive(start_time) + try: + yield + finally: + tunnel_process.terminate() + + +# CLOUDFORMATION UTIL + + +def wait_for_stack_status(cf_conn, stack_name, stack_status): + sys.stdout.write( + "Waiting for stack to enter '{s}' state.".format(s=stack_status)) + sys.stdout.flush() + start_time = datetime.now() + while True: + sleep_progressive(start_time) + stack = cf_conn.describe_stacks(stack_name)[0] + if stack.stack_status == stack_status: + break + sys.stdout.write(".") + sys.stdout.flush() + sys.stdout.write("\n") + end_time = datetime.now() + print "Stack is now in '{s}' state. Waited {t} seconds.".format( + s=stack_status, t=(end_time - start_time).seconds) + + +def create_cf_connection(region): + return boto.cloudformation.connect_to_region(region) + + +def create_cf_stack(cf_conn, stack_name, cf_template_path, availability_zone): + try: + if len(cf_conn.describe_stacks(stack_name)) > 0: + print "Stack '{n}' already exists. Reusing.".format(n=stack_name) + return + except BotoServerError: + # stack does not exist + pass + + print "Creating stack with name '{n}'.".format(n=stack_name) + with open(cf_template_path, 'r') as template_file: + template_body=template_file.read() + cf_conn.create_stack(stack_name, template_body=template_body, + parameters=[('KeyPairName', get_ec2_key_pair()), + ('AZ', availability_zone)], + tags={'owner': getuser(), + 'ec2_key_pair': get_ec2_key_pair()}) + wait_for_stack_status(cf_conn, stack_name, 'CREATE_COMPLETE') + + +def get_stack_resource_id(cf_conn, stack_name, logical_resource_id): + for resource in cf_conn.describe_stack_resources(stack_name): + if resource.logical_resource_id == logical_resource_id: + return resource.physical_resource_id + return None + + +def get_subnet_id(cf_conn, stack_name): + return get_stack_resource_id(cf_conn, stack_name, 'DMZSubnet') + + +def get_security_group_id(cf_conn, stack_name): + return get_stack_resource_id(cf_conn, stack_name, 'ClusterSG') + + +def delete_stack(cf_conn, stack_name): + print "Deleting stack with name '{n}'.".format(n=stack_name) + cf_conn.delete_stack(stack_name) + wait_for_stack_status(cf_conn, stack_name, 'DELETE_COMPLETE') + + +# EC2 UTIL + + +def create_ec2_connection(region): + return boto.ec2.connect_to_region(region) + + +def get_tagged_instances(ec2_conn, tags): + filters = [('tag:' + k, v) for (k, v) in tags.iteritems()] + instances = ec2_conn.get_only_instances(filters=filters) + return [i for i in instances + if i.state not in ["shutting-down", "terminated"]] + + +def get_launcher_instance(ec2_conn, stack_name): + return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, + 'eggo_node_type': 'launcher'})[0] + + +def get_manager_instance(ec2_conn, stack_name): + return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, + 'eggo_node_type': 'manager'})[0] + + +def get_master_instance(ec2_conn, stack_name): + return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, + 'eggo_node_type': 'master'})[0] + + +def get_worker_instances(ec2_conn, stack_name): + return get_tagged_instances(ec2_conn, {'eggo_stack_name': stack_name, + 'eggo_node_type': 'worker'}) + + +def wait_for_instance_state(ec2_conn, instance, state='running'): + sys.stdout.write( + "Waiting for instance to enter '{s}' state.".format(s=state)) + sys.stdout.flush() + start_time = datetime.now() + while True: + sleep_progressive(start_time) + instance.update() + statuses = ec2_conn.get_all_instance_status(instance.id) + if len(statuses) > 0: + status = statuses[0] + if (instance.state == state and + status.system_status.status == 'ok' and + status.instance_status.status == 'ok'): + break + sys.stdout.write(".") + sys.stdout.flush() + sys.stdout.write("\n") + end_time = datetime.now() + print "Instance is now in '{s}' state. Waited {t} seconds.".format( + s=state, t=(end_time - start_time).seconds)