28
28
cfnconfig_file = '/opt/parallelcluster/cfnconfig'
29
29
30
30
31
- def load_scheduler_module (scheduler ):
31
+ def _load_scheduler_module (scheduler ):
32
+ """
33
+ Load scheduler module, containing scheduler specific functions.
34
+
35
+ :param scheduler: scheduler name, it must corresponds to the <scheduler>.py file in the current folder.
36
+ :return: the scheduler module
37
+ """
32
38
scheduler = 'jobwatcher.plugins.' + scheduler
33
39
_scheduler = __import__ (scheduler )
34
40
_scheduler = sys .modules [scheduler ]
@@ -38,7 +44,15 @@ def load_scheduler_module(scheduler):
38
44
return _scheduler
39
45
40
46
41
- def get_asg_name (stack_name , region , proxy_config ):
47
+ def _get_asg_name (stack_name , region , proxy_config ):
48
+ """
49
+ Get autoscaling group name.
50
+
51
+ :param stack_name: stack name to search for
52
+ :param region: AWS region
53
+ :param proxy_config: Proxy configuration
54
+ :return: the ASG name
55
+ """
42
56
asg_conn = boto3 .client ('autoscaling' , region_name = region , config = proxy_config )
43
57
asg_name = ""
44
58
no_asg = True
@@ -48,14 +62,19 @@ def get_asg_name(stack_name, region, proxy_config):
48
62
r = asg_conn .describe_tags (Filters = [{'Name' : 'value' , 'Values' : [stack_name ]}])
49
63
asg_name = r .get ('Tags' )[0 ].get ('ResourceId' )
50
64
no_asg = False
51
- except IndexError as e :
65
+ except IndexError :
52
66
log .error ("No asg found for cluster %s" % stack_name )
53
67
time .sleep (30 )
54
68
55
69
return asg_name
56
70
57
71
58
- def read_cfnconfig ():
72
+ def _read_cfnconfig ():
73
+ """
74
+ Read configuration file.
75
+
76
+ :return: a dictionary containing the configuration parameters
77
+ """
59
78
cfnconfig_params = {}
60
79
with open (cfnconfig_file ) as f :
61
80
for kvp in f :
@@ -64,61 +83,94 @@ def read_cfnconfig():
64
83
return cfnconfig_params
65
84
66
85
67
- def get_vcpus_from_pricing_file (instance_type ):
86
+ def _get_vcpus_from_pricing_file (instance_type ):
87
+ """
88
+ Read pricing file and get number of vcpus for the given instance type.
89
+
90
+ :param instance_type: the instance type to search for.
91
+ :return: the number of vcpus or -1 if the instance type cannot be found
92
+ """
68
93
with open (pricing_file ) as f :
69
94
instances = json .load (f )
70
95
try :
71
96
vcpus = int (instances [instance_type ]["vcpus" ])
72
97
log .info ("Instance %s has %s vcpus." % (instance_type , vcpus ))
73
- return vcpus
74
- except KeyError as e :
75
- log .error ("Instance %s not found in file %s." % (instance_type , pricing_file ))
76
- exit (1 )
98
+ except KeyError :
99
+ log .error ("Unable to get vcpus from file %s. Instance type %s not found." % (pricing_file , instance_type ))
100
+ vcpus = - 1
77
101
102
+ return vcpus
78
103
79
- def get_instance_properties (instance_type ):
80
- cfnconfig_params = read_cfnconfig ()
104
+
105
+ def _get_instance_properties (instance_type ):
106
+ """
107
+ Get instance properties for the given instance type, according to the cfn_scheduler_slots configuration parameter.
108
+
109
+ :param instance_type: instance type to search for
110
+ :return: a dictionary containing the instance properties. E.g. {'slots': <slots>}
111
+ """
81
112
try :
113
+ cfnconfig_params = _read_cfnconfig ()
82
114
cfn_scheduler_slots = cfnconfig_params ["cfn_scheduler_slots" ]
83
- slots = 0
84
- vcpus = get_vcpus_from_pricing_file (instance_type )
85
-
86
- if cfn_scheduler_slots == "cores" :
87
- log .info ("Instance %s will use number of cores as slots based on configuration." % instance_type )
88
- slots = - (- vcpus // 2 )
89
- elif cfn_scheduler_slots == "vcpus" :
90
- log .info ("Instance %s will use number of vcpus as slots based on configuration." % instance_type )
115
+ except KeyError :
116
+ log .error (
117
+ "Required config parameter 'cfn_scheduler_slots' not found in file %s. Assuming 'vcpus'" % cfnconfig_file
118
+ )
119
+ cfn_scheduler_slots = "vcpus"
120
+
121
+ vcpus = _get_vcpus_from_pricing_file (instance_type )
122
+
123
+ if cfn_scheduler_slots == "cores" :
124
+ log .info ("Instance %s will use number of cores as slots based on configuration." % instance_type )
125
+ slots = - (- vcpus // 2 )
126
+
127
+ elif cfn_scheduler_slots == "vcpus" :
128
+ log .info ("Instance %s will use number of vcpus as slots based on configuration." % instance_type )
129
+ slots = vcpus
130
+
131
+ elif cfn_scheduler_slots .isdigit ():
132
+ slots = int (cfn_scheduler_slots )
133
+ log .info ("Instance %s will use %s slots based on configuration." % (instance_type , slots ))
134
+
135
+ if slots <= 0 :
136
+ log .error (
137
+ "cfn_scheduler_slots config parameter '%s' must be greater than 0. "
138
+ "Assuming 'vcpus'" % cfn_scheduler_slots
139
+ )
91
140
slots = vcpus
92
- elif cfn_scheduler_slots . isdigit () :
93
- slots = int ( cfn_scheduler_slots )
94
- log . info ( "Instance %s will use %s slots based on configuration." % ( instance_type , slots ))
141
+ else :
142
+ log . error ( "cfn_scheduler_slots config parameter '%s' is invalid. Assuming 'vcpus'" % cfn_scheduler_slots )
143
+ slots = vcpus
95
144
96
- if not slots > 0 :
97
- log .critical ("cfn_scheduler_slots config parameter '%s' was invalid" % cfn_scheduler_slots )
98
- exit ( 1 )
145
+ if slots <= 0 :
146
+ log .critical ("slots value is invalid. Setting it to 0." )
147
+ slots = 0
99
148
100
- return {'slots' : slots }
149
+ return {'slots' : slots }
101
150
102
- except KeyError :
103
- log .error ("Required config parameter 'cfn_scheduler_slots' not found in file %s." % cfnconfig_file )
104
- exit (1 )
105
151
152
+ def _fetch_pricing_file (pcluster_dir , region , proxy_config ):
153
+ """
154
+ Download pricing file.
106
155
107
- def fetch_pricing_file (proxy_config , cfncluster_dir , region ):
156
+ :param proxy_config: Proxy Configuration
157
+ :param pcluster_dir: Parallelcluster configuration folder
158
+ :param region: AWS Region
159
+ """
108
160
s3 = boto3 .resource ('s3' , region_name = region , config = proxy_config )
109
161
try :
110
- if not os .path .exists (cfncluster_dir ):
111
- os .makedirs (cfncluster_dir )
162
+ if not os .path .exists (pcluster_dir ):
163
+ os .makedirs (pcluster_dir )
112
164
except OSError as ex :
113
- log .critical ('Could not create directory %s. Failed with exception: %s' % (cfncluster_dir , ex ))
165
+ log .critical ('Could not create directory %s. Failed with exception: %s' % (pcluster_dir , ex ))
114
166
raise
115
167
bucket_name = '%s-aws-parallelcluster' % region
116
168
try :
117
169
bucket = s3 .Bucket (bucket_name )
118
- bucket .download_file ('instances/instances.json' , '%s/instances.json' % cfncluster_dir )
170
+ bucket .download_file ('instances/instances.json' , '%s/instances.json' % pcluster_dir )
119
171
except ClientError as e :
120
172
log .critical ("Could not save instance mapping file %s/instances.json from S3 bucket %s. "
121
- "Failed with exception: %s" % (cfncluster_dir , bucket_name , e ))
173
+ "Failed with exception: %s" % (pcluster_dir , bucket_name , e ))
122
174
raise
123
175
124
176
@@ -139,7 +191,7 @@ def main():
139
191
scheduler = config .get ('jobwatcher' , 'scheduler' )
140
192
stack_name = config .get ('jobwatcher' , 'stack_name' )
141
193
instance_type = config .get ('jobwatcher' , 'compute_instance_type' )
142
- cfncluster_dir = config .get ('jobwatcher' , 'cfncluster_dir' )
194
+ pcluster_dir = config .get ('jobwatcher' , 'cfncluster_dir' )
143
195
_proxy = config .get ('jobwatcher' , 'proxy' )
144
196
proxy_config = Config ()
145
197
@@ -150,53 +202,70 @@ def main():
150
202
try :
151
203
asg_name = config .get ('jobwatcher' , 'asg_name' )
152
204
except ConfigParser .NoOptionError :
153
- asg_name = get_asg_name (stack_name , region , proxy_config )
205
+ asg_name = _get_asg_name (stack_name , region , proxy_config )
154
206
config .set ('jobwatcher' , 'asg_name' , asg_name )
155
207
log .info ("Saving asg_name %s in the config file %s" % (asg_name , _configfilename ))
156
208
with open (_configfilename , 'w' ) as configfile :
157
209
config .write (configfile )
158
210
159
211
# fetch the pricing file on startup
160
- fetch_pricing_file ( proxy_config , cfncluster_dir , region )
212
+ _fetch_pricing_file ( pcluster_dir , region , proxy_config )
161
213
162
214
# load scheduler
163
- s = load_scheduler_module (scheduler )
215
+ s = _load_scheduler_module (scheduler )
164
216
165
217
while True :
166
218
# get the number of vcpu's per compute instance
167
- instance_properties = get_instance_properties (instance_type )
168
-
169
- # Get number of nodes requested
170
- pending = s .get_required_nodes (instance_properties )
171
-
172
- # Get number of nodes currently
173
- running = s .get_busy_nodes (instance_properties )
174
-
175
- log .info ("%s jobs pending; %s jobs running" % (pending , running ))
219
+ instance_properties = _get_instance_properties (instance_type )
220
+ if instance_properties .get ('slots' ) <= 0 :
221
+ log .critical ("Error detecting number of slots per instance. The cluster will not scale up." )
176
222
177
- if pending > 0 :
178
- # connect to asg
179
- asg_conn = boto3 . client ( 'autoscaling' , region_name = region , config = proxy_config )
223
+ else :
224
+ # Get number of nodes requested
225
+ pending = s . get_required_nodes ( instance_properties )
180
226
181
- # get current limits
182
- asg = asg_conn . describe_auto_scaling_groups ( AutoScalingGroupNames = [ asg_name ]). get ( 'AutoScalingGroups' )[ 0 ]
227
+ if pending < 0 :
228
+ log . critical ( "Error detecting number of required nodes. The cluster will not scale up." )
183
229
184
- min = asg .get ('MinSize' )
185
- current_desired = asg .get ('DesiredCapacity' )
186
- max = asg .get ('MaxSize' )
187
- log .info ("min/desired/max %d/%d/%d" % (min , current_desired , max ))
188
- log .info ("Nodes requested %d, Nodes running %d" % (pending , running ))
230
+ elif pending == 0 :
231
+ log .debug ("There are no pending jobs. Noop." )
189
232
190
- # check to make sure it's in limits
191
- desired = running + pending
192
- if desired > max :
193
- log .info ("%d requested nodes is greater than max %d. Requesting max %d." % (desired , max , max ))
194
- asg_conn .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = max )
195
- elif desired <= current_desired :
196
- log .info ("%d nodes desired %d nodes in asg. Noop" % (desired , current_desired ))
197
233
else :
198
- log .info ("Setting desired to %d nodes, requesting %d more nodes from asg." % (desired , desired - current_desired ))
199
- asg_conn .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = desired )
234
+ # Get current number of nodes
235
+ running = s .get_busy_nodes (instance_properties )
236
+ log .info ("%s jobs pending; %s jobs running" % (pending , running ))
237
+
238
+ # connect to asg
239
+ asg_client = boto3 .client ('autoscaling' , region_name = region , config = proxy_config )
240
+
241
+ # get current limits
242
+ asg = asg_client .describe_auto_scaling_groups (AutoScalingGroupNames = [asg_name ]).get ('AutoScalingGroups' )[0 ]
243
+
244
+ min_size = asg .get ('MinSize' )
245
+ current_desired = asg .get ('DesiredCapacity' )
246
+ max_size = asg .get ('MaxSize' )
247
+ log .info ("min/desired/max %d/%d/%d" % (min_size , current_desired , max_size ))
248
+ log .info ("%d nodes requested, %d nodes running" % (pending , running ))
249
+
250
+ # Check to make sure requested number of instances is within ASG limits
251
+ required = running + pending
252
+ if required <= current_desired :
253
+ log .info ("%d nodes required, %d nodes in asg. Noop" % (required , current_desired ))
254
+ else :
255
+ if required > max_size :
256
+ log .info (
257
+ "The number of required nodes %d is greater than max %d. Requesting max %d."
258
+ % (required , max_size , max_size )
259
+ )
260
+ else :
261
+ log .info (
262
+ "Setting desired to %d nodes, requesting %d more nodes from asg."
263
+ % (required , required - current_desired )
264
+ )
265
+ requested = min (required , max_size )
266
+
267
+ # update ASG
268
+ asg_client .update_auto_scaling_group (AutoScalingGroupName = asg_name , DesiredCapacity = requested )
200
269
201
270
time .sleep (60 )
202
271
0 commit comments