|
| 1 | +#!/usr/bin/env python2.6 |
| 2 | + |
| 3 | +# Copyright 2013-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 4 | +# |
| 5 | +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the |
| 6 | +# License. A copy of the License is located at |
| 7 | +# |
| 8 | +# http://aws.amazon.com/apache2.0/ |
| 9 | +# |
| 10 | +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES |
| 11 | +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and |
| 12 | +# limitations under the License. |
| 13 | + |
| 14 | +__author__ = 'seaam' |
| 15 | + |
| 16 | +import ConfigParser |
| 17 | +import boto3 |
| 18 | +import os |
| 19 | +import sys |
| 20 | +import time |
| 21 | +import logging |
| 22 | +import json |
| 23 | +from botocore.exceptions import ClientError |
| 24 | +from botocore.config import Config |
| 25 | + |
| 26 | +log = logging.getLogger(__name__) |
| 27 | +pricing_file = '/opt/cfncluster/instances.json' |
| 28 | +cfnconfig_file = '/opt/cfncluster/cfnconfig' |
| 29 | + |
| 30 | + |
| 31 | +def load_scheduler_module(scheduler): |
| 32 | + scheduler = 'jobwatcher.plugins.' + scheduler |
| 33 | + _scheduler = __import__(scheduler) |
| 34 | + _scheduler = sys.modules[scheduler] |
| 35 | + |
| 36 | + log.debug("scheduler=%s" % repr(_scheduler)) |
| 37 | + |
| 38 | + return _scheduler |
| 39 | + |
| 40 | + |
| 41 | +def get_asg_name(stack_name, region, proxy_config): |
| 42 | + asg_conn = boto3.client('autoscaling', region_name=region, config=proxy_config) |
| 43 | + asg_name = "" |
| 44 | + no_asg = True |
| 45 | + |
| 46 | + while no_asg: |
| 47 | + try: |
| 48 | + r = asg_conn.describe_tags(Filters=[{'Name': 'value', 'Values': [stack_name]}]) |
| 49 | + asg_name = r.get('Tags')[0].get('ResourceId') |
| 50 | + no_asg = False |
| 51 | + except IndexError as e: |
| 52 | + log.error("No asg found for cluster %s" % stack_name) |
| 53 | + time.sleep(30) |
| 54 | + |
| 55 | + return asg_name |
| 56 | + |
| 57 | + |
| 58 | +def read_cfnconfig(): |
| 59 | + cfnconfig_params = {} |
| 60 | + with open(cfnconfig_file) as f: |
| 61 | + for kvp in f: |
| 62 | + key, value = kvp.partition('=')[::2] |
| 63 | + cfnconfig_params[key.strip()] = value.strip() |
| 64 | + return cfnconfig_params |
| 65 | + |
| 66 | + |
| 67 | +def get_vcpus_from_pricing_file(instance_type): |
| 68 | + with open(pricing_file) as f: |
| 69 | + instances = json.load(f) |
| 70 | + try: |
| 71 | + vcpus = int(instances[instance_type]["vcpus"]) |
| 72 | + log.info("Instance %s has %s vcpus." % (instance_type, vcpus)) |
| 73 | + return vcpus |
| 74 | + except KeyError as e: |
| 75 | + log.error("Instance %s not found in file %s." % (instance_type, pricing_file)) |
| 76 | + exit(1) |
| 77 | + |
| 78 | + |
| 79 | +def get_instance_properties(instance_type): |
| 80 | + cfnconfig_params = read_cfnconfig() |
| 81 | + try: |
| 82 | + cfn_scheduler_slots = cfnconfig_params["cfn_scheduler_slots"] |
| 83 | + slots = 0 |
| 84 | + vcpus = get_vcpus_from_pricing_file(instance_type) |
| 85 | + |
| 86 | + if cfn_scheduler_slots == "cores": |
| 87 | + log.info("Instance %s will use number of cores as slots based on configuration." % instance_type) |
| 88 | + slots = -(-vcpus//2) |
| 89 | + elif cfn_scheduler_slots == "vcpus": |
| 90 | + log.info("Instance %s will use number of vcpus as slots based on configuration." % instance_type) |
| 91 | + slots = vcpus |
| 92 | + elif cfn_scheduler_slots.isdigit(): |
| 93 | + slots = int(cfn_scheduler_slots) |
| 94 | + log.info("Instance %s will use %s slots based on configuration." % (instance_type, slots)) |
| 95 | + |
| 96 | + if not slots > 0: |
| 97 | + log.critical("cfn_scheduler_slots config parameter '%s' was invalid" % cfn_scheduler_slots) |
| 98 | + exit(1) |
| 99 | + |
| 100 | + return {'slots': slots} |
| 101 | + |
| 102 | + except KeyError: |
| 103 | + log.error("Required config parameter 'cfn_scheduler_slots' not found in file %s." % cfnconfig_file) |
| 104 | + exit(1) |
| 105 | + |
| 106 | + |
| 107 | +def fetch_pricing_file(proxy_config, cfncluster_dir, region): |
| 108 | + s3 = boto3.resource('s3', region_name=region, config=proxy_config) |
| 109 | + try: |
| 110 | + if not os.path.exists(cfncluster_dir): |
| 111 | + os.makedirs(cfncluster_dir) |
| 112 | + except OSError as ex: |
| 113 | + log.critical('Could not create directory %s. Failed with exception: %s' % (cfncluster_dir, ex)) |
| 114 | + raise |
| 115 | + bucket_name = '%s-cfncluster' % region |
| 116 | + try: |
| 117 | + bucket = s3.Bucket(bucket_name) |
| 118 | + bucket.download_file('instances/instances.json', '%s/instances.json' % cfncluster_dir) |
| 119 | + except ClientError as e: |
| 120 | + log.critical("Could not save instance mapping file %s/instances.json from S3 bucket %s. Failed with exception: %s" % (cfncluster_dir, bucket_name, e)) |
| 121 | + raise |
| 122 | + |
| 123 | + |
| 124 | +def main(): |
| 125 | + logging.basicConfig( |
| 126 | + level=logging.INFO, |
| 127 | + format='%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s' |
| 128 | + ) |
| 129 | + |
| 130 | + _configfilename = "/etc/jobwatcher.cfg" |
| 131 | + log.info("Reading configuration file %s" % _configfilename) |
| 132 | + config = ConfigParser.RawConfigParser() |
| 133 | + config.read(_configfilename) |
| 134 | + if config.has_option('jobwatcher', 'loglevel'): |
| 135 | + lvl = logging._levelNames[config.get('jobwatcher', 'loglevel')] |
| 136 | + logging.getLogger().setLevel(lvl) |
| 137 | + region = config.get('jobwatcher', 'region') |
| 138 | + scheduler = config.get('jobwatcher', 'scheduler') |
| 139 | + stack_name = config.get('jobwatcher', 'stack_name') |
| 140 | + instance_type = config.get('jobwatcher', 'compute_instance_type') |
| 141 | + cfncluster_dir = config.get('jobwatcher', 'cfncluster_dir') |
| 142 | + _proxy = config.get('jobwatcher', 'proxy') |
| 143 | + proxy_config = Config() |
| 144 | + |
| 145 | + if not _proxy == "NONE": |
| 146 | + proxy_config = Config(proxies={'https': _proxy}) |
| 147 | + log.info("Configured proxy is: %s" % _proxy) |
| 148 | + |
| 149 | + try: |
| 150 | + asg_name = config.get('jobwatcher', 'asg_name') |
| 151 | + except ConfigParser.NoOptionError: |
| 152 | + asg_name = get_asg_name(stack_name, region, proxy_config) |
| 153 | + config.set('jobwatcher', 'asg_name', asg_name) |
| 154 | + log.info("Saving asg_name %s in the config file %s" % (asg_name, _configfilename)) |
| 155 | + with open(_configfilename, 'w') as configfile: |
| 156 | + config.write(configfile) |
| 157 | + |
| 158 | + # fetch the pricing file on startup |
| 159 | + fetch_pricing_file(proxy_config, cfncluster_dir, region) |
| 160 | + |
| 161 | + # load scheduler |
| 162 | + s = load_scheduler_module(scheduler) |
| 163 | + |
| 164 | + while True: |
| 165 | + # get the number of vcpu's per compute instance |
| 166 | + instance_properties = get_instance_properties(instance_type) |
| 167 | + |
| 168 | + # Get number of nodes requested |
| 169 | + pending = s.get_required_nodes(instance_properties) |
| 170 | + |
| 171 | + # Get number of nodes currently |
| 172 | + running = s.get_busy_nodes(instance_properties) |
| 173 | + |
| 174 | + log.info("%s jobs pending; %s jobs running" % (pending, running)) |
| 175 | + |
| 176 | + if pending > 0: |
| 177 | + # connect to asg |
| 178 | + asg_conn = boto3.client('autoscaling', region_name=region, config=proxy_config) |
| 179 | + |
| 180 | + # get current limits |
| 181 | + asg = asg_conn.describe_auto_scaling_groups(AutoScalingGroupNames=[asg_name]).get('AutoScalingGroups')[0] |
| 182 | + |
| 183 | + min = asg.get('MinSize') |
| 184 | + current_desired = asg.get('DesiredCapacity') |
| 185 | + max = asg.get('MaxSize') |
| 186 | + log.info("min/desired/max %d/%d/%d" % (min, current_desired, max)) |
| 187 | + log.info("Nodes requested %d, Nodes running %d" % (pending, running)) |
| 188 | + |
| 189 | + # check to make sure it's in limits |
| 190 | + desired = running + pending |
| 191 | + if desired > max: |
| 192 | + log.info("%d requested nodes is greater than max %d. Requesting max %d." % (desired, max, max)) |
| 193 | + asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=max) |
| 194 | + elif desired <= current_desired: |
| 195 | + log.info("%d nodes desired %d nodes in asg. Noop" % (desired, current_desired)) |
| 196 | + else: |
| 197 | + log.info("Setting desired to %d nodes, requesting %d more nodes from asg." % (desired, desired - current_desired)) |
| 198 | + asg_conn.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=desired) |
| 199 | + |
| 200 | + time.sleep(60) |
| 201 | + |
| 202 | + |
| 203 | +if __name__ == '__main__': |
| 204 | + main() |
0 commit comments