diff --git a/tools/cluster.py b/tools/cluster.py index 3cf1828a..98c9f961 100755 --- a/tools/cluster.py +++ b/tools/cluster.py @@ -13,7 +13,7 @@ import subprocess from subprocess import check_output, check_call from itertools import chain -from utils import tag_instances, get_masters, get_active_nodes +from utils import tag_instances, get_mains, get_active_nodes from utils import check_call_with_timeout, ProcessTimeoutException import os import sys @@ -38,13 +38,13 @@ default_instance_type = 'r3.xlarge' default_spot_price = '0.10' default_worker_instances = '1' -default_master_instance_type = 'm3.xlarge' +default_main_instance_type = 'm3.xlarge' default_region = 'us-east-1' default_zone = default_region + 'b' default_key_id = 'ignition_key' default_key_file = os.path.expanduser('~/.ssh/ignition_key.pem') default_ami = None # will be decided based on spark-ec2 list -default_master_ami = None +default_main_ami = None default_env = 'dev' default_spark_version = '1.3.0' default_spark_repo = 'https://github.com/chaordic/spark' @@ -58,7 +58,7 @@ default_spark_ec2_git_branch = 'v4-yarn' -master_post_create_commands = [ +main_post_create_commands = [ 'sudo', 'yum', '-y', 'install', 'tmux' ] @@ -142,7 +142,7 @@ def call_ec2_script(args, timeout_total_minutes, timeout_inactivity_minutes): def cluster_exists(cluster_name, region): try: - get_master(cluster_name, region=region) + get_main(cluster_name, region=region) return True except Exception as e: return False @@ -159,23 +159,23 @@ def parse_tags(tag_list): tags[k] = v return tags -def save_cluster_args(master, key_file, remote_user, all_args): - ssh_call(user=remote_user, host=master, key_file=key_file, +def save_cluster_args(main, key_file, remote_user, all_args): + ssh_call(user=remote_user, host=main, key_file=key_file, args=["echo '{}' > /tmp/cluster_args.json".format(json.dumps(all_args))]) -def load_cluster_args(master, key_file, remote_user): - return json.loads(ssh_call(user=remote_user, host=master, key_file=key_file, +def load_cluster_args(main, key_file, remote_user): + return json.loads(ssh_call(user=remote_user, host=main, key_file=key_file, args=["cat", "/tmp/cluster_args.json"], get_output=True)) # Util to be used by external scripts -def save_extra_data(data_str, cluster_name, region=default_region, key_file=default_key_file, remote_user=default_remote_user, master=None): - master = master or get_master(cluster_name, region=region) - ssh_call(user=remote_user, host=master, key_file=key_file, +def save_extra_data(data_str, cluster_name, region=default_region, key_file=default_key_file, remote_user=default_remote_user, main=None): + main = main or get_main(cluster_name, region=region) + ssh_call(user=remote_user, host=main, key_file=key_file, args=["echo '{}' > /tmp/cluster_extra_data.txt".format(data_str)]) -def load_extra_data(cluster_name, region=default_region, key_file=default_key_file, remote_user=default_remote_user, master=None): - master = master or get_master(cluster_name, region=region) - return ssh_call(user=remote_user, host=master, key_file=key_file, +def load_extra_data(cluster_name, region=default_region, key_file=default_key_file, remote_user=default_remote_user, main=None): + main = main or get_main(cluster_name, region=region) + return ssh_call(user=remote_user, host=main, key_file=key_file, args=["cat", "/tmp/cluster_extra_data.txt"], get_output=True) @@ -195,7 +195,7 @@ def tag_cluster_instances(cluster_name, tag=[], env=default_env, region=default_ @argh.arg('-t', '--tag', action='append', type=str, help=tag_help_text) -def launch(cluster_name, slaves, +def launch(cluster_name, subordinates, key_file=default_key_file, env=default_env, tag=[], @@ -206,11 +206,11 @@ def launch(cluster_name, slaves, security_group = None, vpc = None, vpc_subnet = None, - master_instance_type=default_master_instance_type, + main_instance_type=default_main_instance_type, wait_time='180', hadoop_major_version='2', worker_instances=default_worker_instances, retries_on_same_cluster=5, max_clusters_to_create=5, - minimum_percentage_healthy_slaves=0.9, + minimum_percentage_healthy_subordinates=0.9, remote_user=default_remote_user, script_timeout_total_minutes=55, script_timeout_inactivity_minutes=10, @@ -219,7 +219,7 @@ def launch(cluster_name, slaves, spark_version=default_spark_version, spark_ec2_git_repo=default_spark_ec2_git_repo, spark_ec2_git_branch=default_spark_ec2_git_branch, - ami=default_ami, master_ami=default_master_ami): + ami=default_ami, main_ami=default_main_ami): all_args = locals() @@ -252,24 +252,24 @@ def launch(cluster_name, slaves, spot_params = ['--spot-price', spot_price] if not ondemand else [] ami_params = ['--ami', ami] if ami else [] - master_ami_params = ['--master-ami', master_ami] if master_ami else [] + main_ami_params = ['--main-ami', main_ami] if main_ami else [] for i in range(retries_on_same_cluster): log.info('Running script, try %d of %d', i + 1, retries_on_same_cluster) try: call_ec2_script(['--identity-file', key_file, '--key-pair', key_id, - '--slaves', slaves, + '--subordinates', subordinates, '--region', region, '--zone', zone, '--instance-type', instance_type, - '--master-instance-type', master_instance_type, + '--main-instance-type', main_instance_type, '--wait', wait_time, '--hadoop-major-version', hadoop_major_version, '--spark-ec2-git-repo', spark_ec2_git_repo, '--spark-ec2-git-branch', spark_ec2_git_branch, '--worker-instances', worker_instances, - '--master-opts', '-Dspark.worker.timeout={0}'.format(worker_timeout), + '--main-opts', '-Dspark.worker.timeout={0}'.format(worker_timeout), '--spark-git-repo', spark_repo, '-v', spark_version, '--user-data', user_data, @@ -278,7 +278,7 @@ def launch(cluster_name, slaves, resume_param + auth_params + ami_params + - master_ami_params, + main_ami_params, timeout_total_minutes=script_timeout_total_minutes, timeout_inactivity_minutes=script_timeout_inactivity_minutes) success = True @@ -297,11 +297,11 @@ def launch(cluster_name, slaves, try: if success: - master = get_master(cluster_name, region=region) - save_cluster_args(master, key_file, remote_user, all_args) - health_check(cluster_name=cluster_name, key_file=key_file, master=master, remote_user=remote_user, region=region) - ssh_call(user=remote_user, host=master, key_file=key_file, args=master_post_create_commands) - return master + main = get_main(cluster_name, region=region) + save_cluster_args(main, key_file, remote_user, all_args) + health_check(cluster_name=cluster_name, key_file=key_file, main=main, remote_user=remote_user, region=region) + ssh_call(user=remote_user, host=main, key_file=key_file, args=main_post_create_commands) + return main except Exception as e: log.exception('Got exception on last steps of cluster configuration') log.warn('Destroying unsuccessful cluster') @@ -322,16 +322,16 @@ def destroy(cluster_name, delete_groups=False, region=default_region): p.communicate('y') -def get_master(cluster_name, region=default_region): - masters = get_masters(cluster_name, region=region) - if not masters: - raise CommandError("No master on {}".format(cluster_name)) - return masters[0].public_dns_name +def get_main(cluster_name, region=default_region): + mains = get_mains(cluster_name, region=region) + if not mains: + raise CommandError("No main on {}".format(cluster_name)) + return mains[0].public_dns_name -def ssh_master(cluster_name, key_file=default_key_file, user=default_remote_user, region=default_region, *args): - master = get_master(cluster_name, region=region) - ssh_call(user=user, host=master, key_file=key_file, args=args) +def ssh_main(cluster_name, key_file=default_key_file, user=default_remote_user, region=default_region, *args): + main = get_main(cluster_name, region=region) + ssh_call(user=user, host=main, key_file=key_file, args=args) def rsync_call(user, host, key_file, args=[], src_local='', dest_local='', remote_path='', tries=3): @@ -355,8 +355,8 @@ def get_assembly_path(): @arg('job-mem', help='The amount of memory to use for this job (like: 80G)') -@arg('--master', help="This parameter overrides the master of cluster-name") -@arg('--disable-tmux', help='Do not use tmux. Warning: many features will not work without tmux. Use only if the tmux is missing on the master.') +@arg('--main', help="This parameter overrides the main of cluster-name") +@arg('--disable-tmux', help='Do not use tmux. Warning: many features will not work without tmux. Use only if the tmux is missing on the main.') @arg('--detached', help='Run job in background, requires tmux') @arg('--destroy-cluster', help='Will destroy cluster after finishing the job') @named('run') @@ -368,7 +368,7 @@ def job_run(cluster_name, job_name, job_mem, remote_user=default_remote_user, utc_job_date=None, job_tag=None, disable_wait_completion=False, collect_results_dir=default_collect_results_dir, remote_control_dir = default_remote_control_dir, - remote_path=None, master=None, + remote_path=None, main=None, disable_assembly_build=False, run_tests=False, kill_on_failure=False, @@ -379,7 +379,7 @@ def job_run(cluster_name, job_name, job_mem, raise CommandError('UTC Job Date should be given as in the following example: {}'.format(utc_job_date_example)) disable_tmux = disable_tmux and not detached wait_completion = not disable_wait_completion or destroy_cluster - master = master or get_master(cluster_name, region=region) + main = main or get_main(cluster_name, region=region) project_path = get_project_path() project_name = os.path.basename(project_path) @@ -406,33 +406,33 @@ def job_run(cluster_name, job_name, job_mem, if assembly_path is None: raise Exception('Something is wrong: no assembly found') - ssh_call(user=remote_user, host=master, key_file=key_file, + ssh_call(user=remote_user, host=main, key_file=key_file, args=['mkdir', '-p', remote_path]) rsync_call(user=remote_user, - host=master, + host=main, key_file=key_file, src_local=assembly_path, remote_path=with_leading_slash(remote_path)) rsync_call(user=remote_user, - host=master, + host=main, key_file=key_file, src_local=remote_hook_local, remote_path=with_leading_slash(remote_path)) log.info('Will run job in remote host') if disable_tmux: - ssh_call(user=remote_user, host=master, key_file=key_file, args=[non_tmux_arg], allocate_terminal=False) + ssh_call(user=remote_user, host=main, key_file=key_file, args=[non_tmux_arg], allocate_terminal=False) else: - ssh_call(user=remote_user, host=master, key_file=key_file, args=[tmux_arg], allocate_terminal=True) + ssh_call(user=remote_user, host=main, key_file=key_file, args=[tmux_arg], allocate_terminal=True) if wait_completion: failed = False failed_exception = None try: wait_for_job(cluster_name=cluster_name, job_name=job_name, - job_tag=job_tag, key_file=key_file, master=master, + job_tag=job_tag, key_file=key_file, main=main, region=region, job_timeout_minutes=job_timeout_minutes, remote_user=remote_user, remote_control_dir=remote_control_dir, @@ -454,7 +454,7 @@ def job_run(cluster_name, job_name, job_mem, try: kill_job(cluster_name=cluster_name, job_name=job_name, job_tag=job_tag, key_file=key_file, - master=master, remote_user=remote_user, + main=main, remote_user=remote_user, region=region, remote_control_dir=remote_control_dir) log.info('Killed!') @@ -470,27 +470,27 @@ def job_run(cluster_name, job_name, job_mem, @named('attach') def job_attach(cluster_name, key_file=default_key_file, job_name=None, job_tag=None, - master=None, remote_user=default_remote_user, region=default_region): + main=None, remote_user=default_remote_user, region=default_region): - master = master or get_master(cluster_name, region=region) + main = main or get_main(cluster_name, region=region) args = ['tmux', 'attach'] if job_name and job_tag: args += ['-t', 'spark.{0}.{1}'.format(job_name, job_tag)] - ssh_call(user=remote_user, host=master, key_file=key_file, args=args) + ssh_call(user=remote_user, host=main, key_file=key_file, args=args) class NotHealthyCluster(Exception): pass @named('health-check') -def health_check(cluster_name, key_file=default_key_file, master=None, remote_user=default_remote_user, region=default_region): - master = master or get_master(cluster_name, region=region) - all_args = load_cluster_args(master, key_file, remote_user) - nslaves = int(all_args['slaves']) - minimum_percentage_healthy_slaves = all_args['minimum_percentage_healthy_slaves'] - masters, slaves = get_active_nodes(cluster_name, region=region) - if nslaves == 0 or float(len(slaves)) / nslaves < minimum_percentage_healthy_slaves: - raise NotHealthyCluster('Not enough healthy slaves: {0}/{1}'.format(len(slaves), nslaves)) +def health_check(cluster_name, key_file=default_key_file, main=None, remote_user=default_remote_user, region=default_region): + main = main or get_main(cluster_name, region=region) + all_args = load_cluster_args(main, key_file, remote_user) + nsubordinates = int(all_args['subordinates']) + minimum_percentage_healthy_subordinates = all_args['minimum_percentage_healthy_subordinates'] + mains, subordinates = get_active_nodes(cluster_name, region=region) + if nsubordinates == 0 or float(len(subordinates)) / nsubordinates < minimum_percentage_healthy_subordinates: + raise NotHealthyCluster('Not enough healthy subordinates: {0}/{1}'.format(len(subordinates), nsubordinates)) class JobFailure(Exception): pass @@ -511,16 +511,16 @@ def with_leading_slash(s): def collect_job_results(cluster_name, job_name, job_tag, key_file=default_key_file, region=default_region, - master=None, remote_user=default_remote_user, + main=None, remote_user=default_remote_user, remote_control_dir=default_remote_control_dir, collect_results_dir=default_collect_results_dir): - master = master or get_master(cluster_name, region=region) + main = main or get_main(cluster_name, region=region) job_with_tag = get_job_with_tag(job_name, job_tag) job_control_dir = get_job_control_dir(remote_control_dir, job_with_tag) rsync_call(user=remote_user, - host=master, + host=main, # Keep the RUNNING file so we can kill the job if needed args=['--remove-source-files', '--exclude', 'RUNNING'], key_file=key_file, @@ -532,13 +532,13 @@ def collect_job_results(cluster_name, job_name, job_tag, @named('wait-for') def wait_for_job(cluster_name, job_name, job_tag, key_file=default_key_file, - master=None, remote_user=default_remote_user, + main=None, remote_user=default_remote_user, region=default_region, remote_control_dir=default_remote_control_dir, collect_results_dir=default_collect_results_dir, job_timeout_minutes=0, max_failures=5, seconds_to_sleep=60): - master = master or get_master(cluster_name, region=region) + main = main or get_main(cluster_name, region=region) job_with_tag = get_job_with_tag(job_name, job_tag) @@ -559,7 +559,7 @@ def collect(show_tail): dest_log_dir = collect_job_results(cluster_name=cluster_name, job_name=job_name, job_tag=job_tag, key_file=key_file, region=region, - master=master, remote_user=remote_user, + main=main, remote_user=remote_user, remote_control_dir=remote_control_dir, collect_results_dir=collect_results_dir) log.info('Jobs results saved on: {}'.format(dest_log_dir)) @@ -586,7 +586,7 @@ def collect(show_tail): start_time = time.time() while True: try: - output = (ssh_call(user=remote_user, host=master, key_file=key_file, + output = (ssh_call(user=remote_user, host=main, key_file=key_file, args=ssh_call_check_status, get_output=True) or '').strip() if output == 'SUCCESS': log.info('Job finished successfully!') @@ -608,7 +608,7 @@ def collect(show_tail): ] log.info('Will run some commands for posterior investigation of the problem') for command in commands: - ssh_call(user=remote_user, host=master, key_file=key_file, args=command) + ssh_call(user=remote_user, host=main, key_file=key_file, args=command) failures += 1 last_failure = 'Control missing' elif output == 'KILLED': @@ -621,7 +621,7 @@ def collect(show_tail): log.warn('Received unexpected response while checking job status: {}'.format(output)) failures += 1 last_failure = 'Unexpected response: {}'.format(output) - health_check(cluster_name=cluster_name, key_file=key_file, master=master, remote_user=remote_user, region=region) + health_check(cluster_name=cluster_name, key_file=key_file, main=main, remote_user=remote_user, region=region) except subprocess.CalledProcessError as e: failures += 1 log.exception('Got exception') @@ -639,16 +639,16 @@ def collect(show_tail): @named('kill') def kill_job(cluster_name, job_name, job_tag, key_file=default_key_file, - master=None, remote_user=default_remote_user, + main=None, remote_user=default_remote_user, region=default_region, remote_control_dir=default_remote_control_dir): - master = master or get_master(cluster_name, region=region) + main = main or get_main(cluster_name, region=region) job_with_tag = get_job_with_tag(job_name, job_tag) job_control_dir = get_job_control_dir(remote_control_dir, job_with_tag) - ssh_call(user=remote_user, host=master, key_file=key_file, + ssh_call(user=remote_user, host=main, key_file=key_file, args=['''{ pid=$(cat %s/RUNNING) children=$(pgrep -P $pid) @@ -658,11 +658,11 @@ def kill_job(cluster_name, job_name, job_tag, key_file=default_key_file, @named('killall') def killall_jobs(cluster_name, key_file=default_key_file, - master=None, remote_user=default_remote_user, + main=None, remote_user=default_remote_user, region=default_region, remote_control_dir=default_remote_control_dir): - master = master or get_master(cluster_name, region=region) - ssh_call(user=remote_user, host=master, key_file=key_file, + main = main or get_main(cluster_name, region=region) + ssh_call(user=remote_user, host=main, key_file=key_file, args=[ '''for i in {remote_control_dir}/*/RUNNING; do pid=$(cat $i) @@ -675,7 +675,7 @@ def killall_jobs(cluster_name, key_file=default_key_file, parser = ArghParser() -parser.add_commands([launch, destroy, get_master, ssh_master, tag_cluster_instances, health_check]) +parser.add_commands([launch, destroy, get_main, ssh_main, tag_cluster_instances, health_check]) parser.add_commands([job_run, job_attach, wait_for_job, kill_job, killall_jobs, collect_job_results], namespace="jobs") diff --git a/tools/spark-ec2/spark_ec2.py b/tools/spark-ec2/spark_ec2.py index 5fdf0467..975f621c 100755 --- a/tools/spark-ec2/spark_ec2.py +++ b/tools/spark-ec2/spark_ec2.py @@ -84,11 +84,11 @@ def parse_args(): prog="spark-ec2", version="%prog {v}".format(v=SPARK_EC2_VERSION), usage="%prog [options] \n\n" - + " can be: launch, destroy, login, stop, start, get-master, reboot-slaves") + + " can be: launch, destroy, login, stop, start, get-main, reboot-subordinates") parser.add_option( - "-s", "--slaves", type="int", default=1, - help="Number of slaves to launch (default: %default)") + "-s", "--subordinates", type="int", default=1, + help="Number of subordinates to launch (default: %default)") parser.add_option( "-w", "--wait", type="int", help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start") @@ -103,21 +103,21 @@ def parse_args(): help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work") parser.add_option( - "-m", "--master-instance-type", default="", - help="Master instance type (leave empty for same as instance-type)") + "-m", "--main-instance-type", default="", + help="Main instance type (leave empty for same as instance-type)") parser.add_option( "-r", "--region", default="us-east-1", help="EC2 region zone to launch instances in") parser.add_option( "-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "subordinates across multiple (an additional $0.01/Gb for bandwidth" + "between zones applies) (default: a single zone chosen at random)") parser.add_option( "-a", "--ami", help="Amazon Machine Image ID to use") - parser.add_option("--master-ami", - help="Amazon Machine Image ID to use for the Master") + parser.add_option("--main-ami", + help="Amazon Machine Image ID to use for the Main") parser.add_option( "-v", "--spark-version", default=DEFAULT_SPARK_VERSION, help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)") @@ -167,7 +167,7 @@ def parse_args(): help="Swap space to set up per node, in MB (default: %default)") parser.add_option( "--spot-price", metavar="PRICE", type="float", - help="If specified, launch slaves as spot instances with the given " + + help="If specified, launch subordinates as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "--ganglia", action="store_true", default=True, @@ -183,14 +183,14 @@ def parse_args(): "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created") parser.add_option( - "--use-existing-master", action="store_true", default=False, - help="Launch fresh slaves, but use an existing stopped master if possible") + "--use-existing-main", action="store_true", default=False, + help="Launch fresh subordinates, but use an existing stopped main if possible") parser.add_option( "--worker-instances", type="int", default=1, help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: %default)") parser.add_option( - "--master-opts", type="string", default="", - help="Extra options to give to master through SPARK_MASTER_OPTS variable " + + "--main-opts", type="string", default="", + help="Extra options to give to main through SPARK_MASTER_OPTS variable " + "(e.g -Dspark.worker.timeout=180)") parser.add_option( "--user-data", type="string", default="", @@ -371,7 +371,7 @@ def get_spark_ami(instance_type, region, spark_ec2_git_repo, spark_ec2_git_branc # Launch a cluster of the given name, by setting up its security groups, # and then starting new instances in them. -# Returns a tuple of EC2 reservation objects for the master and slaves +# Returns a tuple of EC2 reservation objects for the main and subordinates # Fails if there already instances running in the cluster's groups. def launch_cluster(conn, opts, cluster_name): if opts.identity_file is None: @@ -389,78 +389,78 @@ def launch_cluster(conn, opts, cluster_name): print "Setting up security groups..." if opts.security_group_prefix is None: - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + main_group = get_or_make_group(conn, cluster_name + "-main", opts.vpc_id) + subordinate_group = get_or_make_group(conn, cluster_name + "-subordinates", opts.vpc_id) else: - master_group = get_or_make_group(conn, opts.security_group_prefix + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves", opts.vpc_id) + main_group = get_or_make_group(conn, opts.security_group_prefix + "-main", opts.vpc_id) + subordinate_group = get_or_make_group(conn, opts.security_group_prefix + "-subordinates", opts.vpc_id) authorized_address = opts.authorized_address - if master_group.rules == []: # Group was just now created + if main_group.rules == []: # Group was just now created if opts.vpc_id is None: - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) + main_group.authorize(src_group=main_group) + main_group.authorize(src_group=subordinate_group) else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize('tcp', 22, 22, authorized_address) - master_group.authorize('tcp', 8080, 8081, authorized_address) - master_group.authorize('tcp', 18080, 18080, authorized_address) - master_group.authorize('tcp', 19999, 19999, authorized_address) - master_group.authorize('tcp', 50030, 50030, authorized_address) - master_group.authorize('tcp', 50070, 50070, authorized_address) - master_group.authorize('tcp', 60070, 60070, authorized_address) - master_group.authorize('tcp', 4040, 4045, authorized_address) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + main_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + main_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + main_group.authorize('tcp', 22, 22, authorized_address) + main_group.authorize('tcp', 8080, 8081, authorized_address) + main_group.authorize('tcp', 18080, 18080, authorized_address) + main_group.authorize('tcp', 19999, 19999, authorized_address) + main_group.authorize('tcp', 50030, 50030, authorized_address) + main_group.authorize('tcp', 50070, 50070, authorized_address) + main_group.authorize('tcp', 60070, 60070, authorized_address) + main_group.authorize('tcp', 4040, 4045, authorized_address) if opts.ganglia: - master_group.authorize('tcp', 5080, 5080, authorized_address) - if slave_group.rules == []: # Group was just now created + main_group.authorize('tcp', 5080, 5080, authorized_address) + if subordinate_group.rules == []: # Group was just now created if opts.vpc_id is None: - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) + subordinate_group.authorize(src_group=main_group) + subordinate_group.authorize(src_group=subordinate_group) else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize('tcp', 22, 22, authorized_address) - slave_group.authorize('tcp', 8080, 8081, authorized_address) - slave_group.authorize('tcp', 50060, 50060, authorized_address) - slave_group.authorize('tcp', 50075, 50075, authorized_address) - slave_group.authorize('tcp', 60060, 60060, authorized_address) - slave_group.authorize('tcp', 60075, 60075, authorized_address) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=main_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=main_group) + subordinate_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, + src_group=subordinate_group) + subordinate_group.authorize('tcp', 22, 22, authorized_address) + subordinate_group.authorize('tcp', 8080, 8081, authorized_address) + subordinate_group.authorize('tcp', 50060, 50060, authorized_address) + subordinate_group.authorize('tcp', 50075, 50075, authorized_address) + subordinate_group.authorize('tcp', 60060, 60060, authorized_address) + subordinate_group.authorize('tcp', 60075, 60075, authorized_address) # Check if instances are already running in our groups - existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, + existing_mains, existing_subordinates = get_existing_cluster(conn, opts, cluster_name, die_on_error=False) - if existing_slaves or (existing_masters and not opts.use_existing_master): + if existing_subordinates or (existing_mains and not opts.use_existing_main): print >> stderr, ("ERROR: There are already instances running in " + - "group %s or %s" % (master_group.name, slave_group.name)) + "group %s or %s" % (main_group.name, subordinate_group.name)) sys.exit(1) # Figure out Spark AMI if opts.ami is None: opts.ami = get_spark_ami(opts.instance_type, opts.region, opts.spark_ec2_git_repo, opts.spark_ec2_git_branch) - if opts.master_ami is None: - opts.master_ami = get_spark_ami(opts.master_instance_type, opts.region, opts.spark_ec2_git_repo, opts.spark_ec2_git_branch) + if opts.main_ami is None: + opts.main_ami = get_spark_ami(opts.main_instance_type, opts.region, opts.spark_ec2_git_repo, opts.spark_ec2_git_branch) # we use group ids to work around https://github.com/boto/boto/issues/350 additional_group_ids = [] @@ -477,9 +477,9 @@ def launch_cluster(conn, opts, cluster_name): sys.exit(1) try: - master_image = conn.get_all_images(image_ids=[opts.master_ami])[0] + main_image = conn.get_all_images(image_ids=[opts.main_ami])[0] except: - print >> stderr, "Could not find AMI " + opts.master_ami + print >> stderr, "Could not find AMI " + opts.main_ami sys.exit(1) # Create block device mapping so that we can add EBS volumes if asked to. @@ -499,31 +499,31 @@ def launch_cluster(conn, opts, cluster_name): name = '/dev/xvd' + string.letters[i + 1] block_map[name] = dev - # Launch slaves + # Launch subordinates if opts.spot_price is not None: # Launch spot instances with the requested price - print ("Requesting %d slaves as spot instances with price $%.3f" % - (opts.slaves, opts.spot_price)) + print ("Requesting %d subordinates as spot instances with price $%.3f" % + (opts.subordinates, opts.spot_price)) zones = get_zones(conn, opts) num_zones = len(zones) i = 0 my_req_ids = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - slave_reqs = conn.request_spot_instances( + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + subordinate_reqs = conn.request_spot_instances( price=opts.spot_price, image_id=opts.ami, launch_group="launch-group-%s" % cluster_name, placement=zone, - count=num_slaves_this_zone, + count=num_subordinates_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content) - my_req_ids += [req.id for req in slave_reqs] + my_req_ids += [req.id for req in subordinate_reqs] i += 1 start_time = datetime.now() @@ -538,16 +538,16 @@ def launch_cluster(conn, opts, cluster_name): if len(invalid) > 0: raise Exception("Invalid state for spot request: %s - status: %s" % (invalid[0].id, invalid[0].status.message)) - if len(active_instance_ids) == opts.slaves: - print "All %d slaves granted" % opts.slaves + if len(active_instance_ids) == opts.subordinates: + print "All %d subordinates granted" % opts.subordinates reservations = conn.get_all_reservations(active_instance_ids) - slave_nodes = [] + subordinate_nodes = [] for r in reservations: - slave_nodes += r.instances + subordinate_nodes += r.instances break else: - print "%d of %d slaves granted, waiting longer" % ( - len(active_instance_ids), opts.slaves) + print "%d of %d subordinates granted, waiting longer" % ( + len(active_instance_ids), opts.subordinates) if (datetime.now() - start_time).seconds > opts.spot_timeout * 60: raise Exception("Timed out while waiting for spot instances") @@ -556,9 +556,9 @@ def launch_cluster(conn, opts, cluster_name): print "Canceling spot instance requests" conn.cancel_spot_instance_requests(my_req_ids) # Log a warning if any of these requests actually launched instances: - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - running = len(master_nodes) + len(slave_nodes) + running = len(main_nodes) + len(subordinate_nodes) if running: print >> stderr, ("WARNING: %d instances are still running" % running) sys.exit(0) @@ -567,41 +567,41 @@ def launch_cluster(conn, opts, cluster_name): zones = get_zones(conn, opts) num_zones = len(zones) i = 0 - slave_nodes = [] + subordinate_nodes = [] for zone in zones: - num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) - if num_slaves_this_zone > 0: - slave_res = image.run(key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + num_subordinates_this_zone = get_partition(opts.subordinates, num_zones, i) + if num_subordinates_this_zone > 0: + subordinate_res = image.run(key_name=opts.key_pair, + security_group_ids=[subordinate_group.id] + additional_group_ids, instance_type=opts.instance_type, placement=zone, - min_count=num_slaves_this_zone, - max_count=num_slaves_this_zone, + min_count=num_subordinates_this_zone, + max_count=num_subordinates_this_zone, block_device_map=block_map, subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content) - slave_nodes += slave_res.instances - print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, - zone, slave_res.id) + subordinate_nodes += subordinate_res.instances + print "Launched %d subordinates in %s, regid = %s" % (num_subordinates_this_zone, + zone, subordinate_res.id) i += 1 - # Launch or resume masters - if existing_masters: - print "Starting master..." - for inst in existing_masters: + # Launch or resume mains + if existing_mains: + print "Starting main..." + for inst in existing_mains: if inst.state not in ["shutting-down", "terminated"]: inst.start() - master_nodes = existing_masters + main_nodes = existing_mains else: - master_type = opts.master_instance_type - if master_type == "": - master_type = opts.instance_type + main_type = opts.main_instance_type + if main_type == "": + main_type = opts.instance_type if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name - master_res = master_image.run(key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, - instance_type=master_type, + main_res = main_image.run(key_name=opts.key_pair, + security_group_ids=[main_group.id] + additional_group_ids, + instance_type=main_type, placement=opts.zone, min_count=1, max_count=1, @@ -610,48 +610,48 @@ def launch_cluster(conn, opts, cluster_name): placement_group=opts.placement_group, user_data=user_data_content) - master_nodes = master_res.instances - print "Launched master in %s, regid = %s" % (zone, master_res.id) + main_nodes = main_res.instances + print "Launched main in %s, regid = %s" % (zone, main_res.id) # This wait time corresponds to SPARK-4983 print "Waiting for AWS to propagate instance metadata..." time.sleep(5) # Give the instances descriptive names - for master in master_nodes: - master.add_tag( + for main in main_nodes: + main.add_tag( key='Name', - value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) - for slave in slave_nodes: - slave.add_tag( + value='{cn}-main-{iid}'.format(cn=cluster_name, iid=main.id)) + for subordinate in subordinate_nodes: + subordinate.add_tag( key='Name', - value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) + value='{cn}-subordinate-{iid}'.format(cn=cluster_name, iid=subordinate.id)) # Return all the instances - return (master_nodes, slave_nodes) + return (main_nodes, subordinate_nodes) # Get the EC2 instances in an existing cluster if available. -# Returns a tuple of lists of EC2 instance objects for the masters and slaves +# Returns a tuple of lists of EC2 instance objects for the mains and subordinates def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." reservations = conn.get_all_reservations() - master_nodes = [] - slave_nodes = [] + main_nodes = [] + subordinate_nodes = [] for res in reservations: active = [i for i in res.instances if is_active(i)] for inst in active: group_names = [g.name for g in inst.groups] - if (cluster_name + "-master") in group_names: - master_nodes.append(inst) - elif (cluster_name + "-slaves") in group_names: - slave_nodes.append(inst) - if any((master_nodes, slave_nodes)): - print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)) - if master_nodes != [] or not die_on_error: - return (master_nodes, slave_nodes) + if (cluster_name + "-main") in group_names: + main_nodes.append(inst) + elif (cluster_name + "-subordinates") in group_names: + subordinate_nodes.append(inst) + if any((main_nodes, subordinate_nodes)): + print "Found %d main(s), %d subordinates" % (len(main_nodes), len(subordinate_nodes)) + if main_nodes != [] or not die_on_error: + return (main_nodes, subordinate_nodes) else: - if master_nodes == [] and slave_nodes != []: - print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master" + if main_nodes == [] and subordinate_nodes != []: + print >> sys.stderr, "ERROR: Could not find main in group " + cluster_name + "-main" else: print >> sys.stderr, "ERROR: Could not find any existing cluster" sys.exit(1) @@ -661,21 +661,21 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): # or started EC2 cluster. -def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = master_nodes[0].public_dns_name +def setup_cluster(conn, main_nodes, subordinate_nodes, opts, deploy_ssh_key): + main = main_nodes[0].public_dns_name if deploy_ssh_key: - print "Generating cluster's SSH key on master..." + print "Generating cluster's SSH key on main..." key_setup = """ [ -f ~/.ssh/id_rsa ] || (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys) """ - ssh(master, opts, key_setup) - dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) - print "Transferring cluster's SSH key to slaves..." - for slave in slave_nodes: - print slave.public_dns_name - ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) + ssh(main, opts, key_setup) + dot_ssh_tar = ssh_read(main, opts, ['tar', 'c', '.ssh']) + print "Transferring cluster's SSH key to subordinates..." + for subordinate in subordinate_nodes: + print subordinate.public_dns_name + ssh_write(subordinate.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] @@ -688,10 +688,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): # NOTE: We should clone the repository before running deploy_files to # prevent ec2-variables.sh from being overwritten - print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format( + print "Cloning spark-ec2 scripts from {r}/tree/{b} on main...".format( r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch) ssh( - host=master, + host=main, opts=opts, command="rm -rf spark-ec2" + " && " @@ -699,28 +699,28 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): b=opts.spark_ec2_git_branch) ) - print "Deploying files to master..." + print "Deploying files to main..." deploy_files( conn=conn, root_dir=SPARK_EC2_DIR + "/" + "deploy.generic", opts=opts, - master_nodes=master_nodes, - slave_nodes=slave_nodes, + main_nodes=main_nodes, + subordinate_nodes=subordinate_nodes, modules=modules ) - print "Running setup on master..." - setup_spark_cluster(master, opts) + print "Running setup on main..." + setup_spark_cluster(main, opts) print "Done!" -def setup_spark_cluster(master, opts): - ssh(master, opts, "chmod u+x spark-ec2/setup.sh") - ssh(master, opts, "spark-ec2/setup.sh") - print "Spark standalone cluster started at http://%s:8080" % master +def setup_spark_cluster(main, opts): + ssh(main, opts, "chmod u+x spark-ec2/setup.sh") + ssh(main, opts, "spark-ec2/setup.sh") + print "Spark standalone cluster started at http://%s:8080" % main if opts.ganglia: - print "Ganglia started at http://%s:5080/ganglia" % master + print "Ganglia started at http://%s:5080/ganglia" % main def is_ssh_available(host, opts, print_ssh_output=True): @@ -868,13 +868,13 @@ def get_num_disks(instance_type): # Deploy the configuration file templates in a given local directory to # a cluster, filling in any template parameters with information about the -# cluster (e.g. lists of masters and slaves). Files are only deployed to -# the first master instance in the cluster, and we expect the setup +# cluster (e.g. lists of mains and subordinates). Files are only deployed to +# the first main instance in the cluster, and we expect the setup # script to be run on that instance to copy them to other nodes. # # root_dir should be an absolute path to the directory with the files we want to deploy. -def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].public_dns_name +def deploy_files(conn, root_dir, opts, main_nodes, subordinate_nodes, modules): + active_main = main_nodes[0].public_dns_name num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -886,7 +886,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i spark_local_dirs += ",/mnt%d/spark" % i - cluster_url = "%s:7077" % active_master + cluster_url = "%s:7077" % active_main if opts.spark_version.startswith("http"): # Custom pre-built spark package @@ -899,9 +899,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version) template_vars = { - "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), - "active_master": active_master, - "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), + "main_list": '\n'.join([i.public_dns_name for i in main_nodes]), + "active_main": active_main, + "subordinate_list": '\n'.join([i.public_dns_name for i in subordinate_nodes]), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -911,7 +911,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): "spark_version": spark_v, "hadoop_major_version": opts.hadoop_major_version, "spark_worker_instances": "%d" % opts.worker_instances, - "spark_master_opts": opts.master_opts + "spark_main_opts": opts.main_opts } if opts.copy_aws_credentials: @@ -941,12 +941,12 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): text = text.replace("{{" + key + "}}", template_vars[key]) dest.write(text) dest.close() - # rsync the whole directory over to the master machine + # rsync the whole directory over to the main machine command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), "%s/" % tmp_dir, - "%s@%s:/" % (opts.user, active_master) + "%s@%s:/" % (opts.user, active_main) ] subprocess.check_call(command) # Remove the temp directory we created above @@ -1048,10 +1048,10 @@ def get_zones(conn, opts): # Gets the number of items in a partition def get_partition(total, num_partitions, current_partitions): - num_slaves_this_zone = total / num_partitions + num_subordinates_this_zone = total / num_partitions if (total % num_partitions) - current_partitions > 0: - num_slaves_this_zone += 1 - return num_slaves_this_zone + num_subordinates_this_zone += 1 + return num_subordinates_this_zone def real_main(): @@ -1086,11 +1086,11 @@ def real_main(): print >> stderr, "Warning: Unrecognized EC2 instance type for instance-type: {t}".format( t=opts.instance_type) - if opts.master_instance_type != "": - if opts.master_instance_type not in EC2_INSTANCE_TYPES: + if opts.main_instance_type != "": + if opts.main_instance_type not in EC2_INSTANCE_TYPES: print >> stderr, \ - "Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format( - t=opts.master_instance_type) + "Warning: Unrecognized EC2 instance type for main-instance-type: {t}".format( + t=opts.main_instance_type) if opts.ebs_vol_num > 8: print >> stderr, "ebs-vol-num cannot be greater than 8" @@ -1118,47 +1118,47 @@ def real_main(): opts.zone = random.choice(conn.get_all_zones()).name if action == "launch": - if opts.slaves <= 0: - print >> sys.stderr, "ERROR: You have to start at least 1 slave" + if opts.subordinates <= 0: + print >> sys.stderr, "ERROR: You have to start at least 1 subordinate" sys.exit(1) if opts.resume: - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) else: - (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) + (main_nodes, subordinate_nodes) = launch_cluster(conn, opts, cluster_name) wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) - setup_cluster(conn, master_nodes, slave_nodes, opts, True) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, True) elif action == "destroy": print "Are you sure you want to destroy the cluster %s?" % cluster_name print "The following instances will be terminated:" - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - for inst in master_nodes + slave_nodes: + for inst in main_nodes + subordinate_nodes: print "> %s" % inst.public_dns_name msg = "ALL DATA ON ALL NODES WILL BE LOST!!\nDestroy cluster %s (y/N): " % cluster_name response = raw_input(msg) if response == "y": - print "Terminating master..." - for inst in master_nodes: + print "Terminating main..." + for inst in main_nodes: inst.terminate() - print "Terminating slaves..." - for inst in slave_nodes: + print "Terminating subordinates..." + for inst in subordinate_nodes: inst.terminate() # Delete security groups as well if opts.delete_groups: print "Deleting security groups (this will take some time)..." - group_names = [cluster_name + "-master", cluster_name + "-slaves"] + group_names = [cluster_name + "-main", cluster_name + "-subordinates"] wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='terminated' ) attempt = 1 @@ -1200,32 +1200,32 @@ def real_main(): print "Try re-running in a few minutes." elif action == "login": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - master = master_nodes[0].public_dns_name - print "Logging into master " + master + "..." + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + main = main_nodes[0].public_dns_name + print "Logging into main " + main + "..." proxy_opt = [] if opts.proxy_port is not None: proxy_opt = ['-D', opts.proxy_port] subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, main)]) - elif action == "reboot-slaves": + elif action == "reboot-subordinates": response = raw_input( "Are you sure you want to reboot the cluster " + - cluster_name + " slaves?\n" + - "Reboot cluster slaves " + cluster_name + " (y/N): ") + cluster_name + " subordinates?\n" + + "Reboot cluster subordinates " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print "Rebooting slaves..." - for inst in slave_nodes: + print "Rebooting subordinates..." + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: print "Rebooting " + inst.id inst.reboot() - elif action == "get-master": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print master_nodes[0].public_dns_name + elif action == "get-main": + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print main_nodes[0].public_dns_name elif action == "stop": response = raw_input( @@ -1233,17 +1233,17 @@ def real_main(): cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " + "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" + "AMAZON EBS IF IT IS EBS-BACKED!!\n" + - "All data on spot-instance slaves will be lost.\n" + + "All data on spot-instance subordinates will be lost.\n" + "Stop cluster " + cluster_name + " (y/N): ") if response == "y": - (master_nodes, slave_nodes) = get_existing_cluster( + (main_nodes, subordinate_nodes) = get_existing_cluster( conn, opts, cluster_name, die_on_error=False) - print "Stopping master..." - for inst in master_nodes: + print "Stopping main..." + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.stop() - print "Stopping slaves..." - for inst in slave_nodes: + print "Stopping subordinates..." + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: if inst.spot_instance_request_id: inst.terminate() @@ -1251,22 +1251,22 @@ def real_main(): inst.stop() elif action == "start": - (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print "Starting slaves..." - for inst in slave_nodes: + (main_nodes, subordinate_nodes) = get_existing_cluster(conn, opts, cluster_name) + print "Starting subordinates..." + for inst in subordinate_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - print "Starting master..." - for inst in master_nodes: + print "Starting main..." + for inst in main_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() wait_for_cluster_state( conn=conn, opts=opts, - cluster_instances=(master_nodes + slave_nodes), + cluster_instances=(main_nodes + subordinate_nodes), cluster_state='ssh-ready' ) - setup_cluster(conn, master_nodes, slave_nodes, opts, False) + setup_cluster(conn, main_nodes, subordinate_nodes, opts, False) else: print >> stderr, "Invalid action: %s" % action diff --git a/tools/utils.py b/tools/utils.py index bac56029..77af203b 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -16,22 +16,22 @@ def get_active_instances(conn): return active def parse_nodes(active_instances, cluster_name): - master_nodes = [] - slave_nodes = [] + main_nodes = [] + subordinate_nodes = [] for instance in active_instances: group_names = [g.name for g in instance.groups] - if (cluster_name + "-master") in group_names: - master_nodes.append(instance) - elif (cluster_name + "-slaves") in group_names: - slave_nodes.append(instance) - return (master_nodes, slave_nodes) + if (cluster_name + "-main") in group_names: + main_nodes.append(instance) + elif (cluster_name + "-subordinates") in group_names: + subordinate_nodes.append(instance) + return (main_nodes, subordinate_nodes) -def get_masters(cluster_name, region): +def get_mains(cluster_name, region): conn = boto.ec2.connect_to_region(region) active = get_active_instances(conn) - master_nodes, slave_nodes = parse_nodes(active, cluster_name) - return master_nodes + main_nodes, subordinate_nodes = parse_nodes(active, cluster_name) + return main_nodes def get_active_nodes(cluster_name, region): conn = boto.ec2.connect_to_region(region) @@ -45,18 +45,18 @@ def tag_instances(cluster_name, tags, region): active = get_active_instances(conn) logging.info('%d active instances', len(active)) - master_nodes, slave_nodes = parse_nodes(active, cluster_name) - logging.info('%d master, %d slave', len(master_nodes), len(slave_nodes)) + main_nodes, subordinate_nodes = parse_nodes(active, cluster_name) + logging.info('%d main, %d subordinate', len(main_nodes), len(subordinate_nodes)) - if master_nodes: - conn.create_tags([i.id for i in master_nodes], - {'spark_node_type': 'master'}) - if slave_nodes: - conn.create_tags([i.id for i in slave_nodes], - {'spark_node_type': 'slave'}) + if main_nodes: + conn.create_tags([i.id for i in main_nodes], + {'spark_node_type': 'main'}) + if subordinate_nodes: + conn.create_tags([i.id for i in subordinate_nodes], + {'spark_node_type': 'subordinate'}) - if slave_nodes or master_nodes: - ids = [i.id for l in (master_nodes, slave_nodes) for i in l] + if subordinate_nodes or main_nodes: + ids = [i.id for l in (main_nodes, subordinate_nodes) for i in l] conn.create_tags(ids, tags) logging.info("Tagged nodes.")