forked from spdfg/elektron
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
294 lines (264 loc) · 12 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
// Copyright (C) 2018 spdfg
//
// This file is part of Elektron.
//
// Elektron is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Elektron is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Elektron. If not, see <http://www.gnu.org/licenses/>.
//
package main // import github.com/spdfg/elektron
import (
"flag"
"fmt"
"os"
"os/signal"
"strings"
"time"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
sched "github.com/mesos/mesos-go/api/v0/scheduler"
log "github.com/sirupsen/logrus"
"github.com/spdfg/elektron/def"
elekLog "github.com/spdfg/elektron/logging"
. "github.com/spdfg/elektron/logging/types"
"github.com/spdfg/elektron/pcp"
"github.com/spdfg/elektron/powerCap"
"github.com/spdfg/elektron/schedulers"
)
var master = flag.String("master", "", "Location of leading Mesos master -- <mesos-master>:<port>")
var tasksFile = flag.String("workload", "", "JSON file containing task definitions")
var wattsAsAResource = flag.Bool("wattsAsAResource", false, "Enable Watts as a Resource")
var pcpConfigFile = flag.String("pcpConfigFile", "config", "PCP config file name (if file not "+
"present in the same directory, then provide path).")
var pcplogPrefix = flag.String("logPrefix", "", "Prefix for pcplog")
var powerCapPolicy = flag.String("powercap", "", "Power Capping policy. (default (''), extrema, prog-extrema).")
var hiThreshold = flag.Float64("hiThreshold", 0.0, "Upperbound for when we should start capping")
var loThreshold = flag.Float64("loThreshold", 0.0, "Lowerbound for when we should start uncapping")
var classMapWatts = flag.Bool("classMapWatts", false, "Enable mapping of watts to power class of node")
var schedPolicyName = flag.String("schedPolicy", "first-fit", "Name of the scheduling policy to be used.\n\tUse option -listSchedPolicies to get the names of available scheduling policies")
var listSchedPolicies = flag.Bool("listSchedPolicies", false, "List the names of the pluaggable scheduling policies.")
var enableSchedPolicySwitch = flag.Bool("switchSchedPolicy", false, "Enable switching of scheduling policies at runtime.")
var schedPolConfigFile = flag.String("schedPolConfig", "", "Config file that contains information for each scheduling policy.")
var fixFirstSchedPol = flag.String("fixFirstSchedPol", "", "Name of the scheduling policy to be deployed first, regardless of the distribution of tasks, provided switching is enabled.")
var fixSchedWindow = flag.Bool("fixSchedWindow", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled.")
var schedWindowSize = flag.Int("schedWindowSize", 200, "Size of the scheduling window if fixSchedWindow is set.")
var schedPolSwitchCriteria = flag.String("schedPolSwitchCriteria", "taskDist", "Scheduling policy switching criteria.")
var logConfigFilename = flag.String("logConfigFilename", "logConfig.yaml", "Log Configuration file name")
// Short hand args
func init() {
flag.StringVar(master, "m", "", "Location of leading Mesos master (shorthand)")
flag.StringVar(tasksFile, "w", "", "JSON file containing task definitions (shorthand)")
flag.BoolVar(wattsAsAResource, "waar", false, "Enable Watts as a Resource (shorthand)")
flag.StringVar(pcpConfigFile, "pcpCF", "config", "PCP config file name (if not present in"+
" the same directory, then provide path) (shorthand).")
flag.StringVar(pcplogPrefix, "p", "", "Prefix for pcplog (shorthand)")
flag.StringVar(powerCapPolicy, "pc", "", "Power Capping policy. (default (''), extrema, prog-extrema) (shorthand).")
flag.Float64Var(hiThreshold, "ht", 700.0, "Upperbound for when we should start capping (shorthand)")
flag.Float64Var(loThreshold, "lt", 400.0, "Lowerbound for when we should start uncapping (shorthand)")
flag.BoolVar(classMapWatts, "cmw", false, "Enable mapping of watts to power class of node (shorthand)")
flag.StringVar(schedPolicyName, "sp", "first-fit", "Name of the scheduling policy to be used.\n Use option -listSchedPolicies to get the names of available scheduling policies (shorthand)")
flag.BoolVar(listSchedPolicies, "lsp", false, "Names of the pluaggable scheduling policies. (shorthand)")
flag.BoolVar(enableSchedPolicySwitch, "ssp", false, "Enable switching of scheduling policies at runtime.")
flag.StringVar(schedPolConfigFile, "spConfig", "", "Config file that contains information for each scheduling policy (shorthand).")
flag.StringVar(fixFirstSchedPol, "fxFstSchedPol", "", "Name of the scheduling gpolicy to be deployed first, regardless of the distribution of tasks, provided switching is enabled (shorthand).")
flag.BoolVar(fixSchedWindow, "fixSw", false, "Fix the size of the scheduling window that every deployed scheduling policy should schedule, provided switching is enabled (shorthand).")
flag.IntVar(schedWindowSize, "swSize", 200, "Size of the scheduling window if fixSchedWindow is set (shorthand).")
flag.StringVar(schedPolSwitchCriteria, "spsCriteria", "taskDist", "Scheduling policy switching criteria (shorthand).")
flag.StringVar(logConfigFilename, "lgCfg", "logConfig.yaml", "Log Configuration file name (shorthand).")
}
func listAllSchedulingPolicies() {
fmt.Println("Scheduling Policies")
fmt.Println("-------------------")
for policyName := range schedulers.SchedPolicies {
fmt.Println(policyName)
}
}
func main() {
flag.Parse()
// Checking to see if we need to just list the pluggable scheduling policies
if *listSchedPolicies {
listAllSchedulingPolicies()
os.Exit(1)
}
// First we need to build the scheduler using scheduler options.
var schedOptions []schedulers.SchedulerOptions = make([]schedulers.SchedulerOptions, 0, 10)
// OPTIONAL PARAMETERS
// Scheduling Policy Name
// If non-default scheduling policy given, checking if name exists.
if *schedPolicyName != "first-fit" {
if _, ok := schedulers.SchedPolicies[*schedPolicyName]; !ok {
// invalid scheduling policy
log.Println("Invalid scheduling policy given. The possible scheduling policies are:")
listAllSchedulingPolicies()
os.Exit(1)
}
}
// CHANNELS AND FLAGS.
shutdown := make(chan struct{})
done := make(chan struct{})
pcpLog := make(chan struct{})
recordPCP := false
// Shutdown indicator channels.
// These channels are used to notify,
// 1. scheduling is complete.
// 2. all scheduled tasks have completed execution and framework can shutdown.
schedOptions = append(schedOptions, schedulers.WithShutdown(shutdown))
schedOptions = append(schedOptions, schedulers.WithDone(done))
// If here, then valid scheduling policy name provided.
schedOptions = append(schedOptions, schedulers.WithSchedPolicy(*schedPolicyName))
// Scheduling Policy Switching.
if *enableSchedPolicySwitch {
// Scheduling policy config file required.
if spcf := *schedPolConfigFile; spcf == "" {
log.Fatal("Scheduling policy characteristics file not provided.")
} else {
// Initializing the characteristics of the scheduling policies.
schedulers.InitSchedPolicyCharacteristics(spcf)
schedOptions = append(schedOptions, schedulers.WithSchedPolSwitchEnabled(*enableSchedPolicySwitch, *schedPolSwitchCriteria))
// Fix First Scheduling Policy.
schedOptions = append(schedOptions, schedulers.WithNameOfFirstSchedPolToFix(*fixFirstSchedPol))
// Fix Scheduling Window.
schedOptions = append(schedOptions, schedulers.WithFixedSchedulingWindow(*fixSchedWindow, *schedWindowSize))
}
}
// Watts as a Resource (WaaR) and ClassMapWatts (CMW).
// If WaaR and CMW is enabled then for each task the class_to_watts mapping is used to
// fit tasks into offers.
// If CMW is disabled, then the Median of Medians Max Peak Power Usage value is used
// as the watts value for each task.
if *wattsAsAResource {
log.Println("WaaR enabled...")
schedOptions = append(schedOptions, schedulers.WithWattsAsAResource(*wattsAsAResource))
schedOptions = append(schedOptions, schedulers.WithClassMapWatts(*classMapWatts))
}
// REQUIRED PARAMETERS.
// PCP logging, Power capping and High and Low thresholds.
schedOptions = append(schedOptions, schedulers.WithRecordPCP(&recordPCP))
schedOptions = append(schedOptions, schedulers.WithPCPLog(pcpLog))
var noPowercap bool
var extrema bool
var progExtrema bool
var powercapValues map[string]struct{} = map[string]struct{}{
"": {},
"extrema": {},
"prog-extrema": {},
}
if _, ok := powercapValues[*powerCapPolicy]; !ok {
log.Fatal("Incorrect power-capping algorithm specified.")
} else {
// Indicating which power capping algorithm to use, if any.
// The pcp-logging with/without power capping will be run after the
// scheduler has been configured.
if *powerCapPolicy == "" {
noPowercap = true
} else {
if *powerCapPolicy == "extrema" {
extrema = true
} else if *powerCapPolicy == "prog-extrema" {
progExtrema = true
}
// High and Low thresholds are currently only needed for extrema and
// progressive extrema.
if extrema || progExtrema {
// High and Low Thresholds.
// These values are not used to configure the scheduler.
// hiThreshold and loThreshold are passed to the powercappers.
if *hiThreshold < *loThreshold {
log.Fatal("High threshold is of a lower value than low " +
"threshold.")
}
}
}
}
// Tasks
// If httpServer is disabled, then path of file containing workload needs to be provided.
if *tasksFile == "" {
log.Fatal("Tasks specifications file not provided.")
}
tasks, err := def.TasksFromJSON(*tasksFile)
if err != nil || len(tasks) == 0 {
log.Fatal(err)
}
schedOptions = append(schedOptions, schedulers.WithTasks(tasks))
// Scheduler.
scheduler := schedulers.SchedFactory(schedOptions...)
// Scheduler driver.
driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: *master,
Framework: &mesos.FrameworkInfo{
Name: proto.String("Elektron"),
User: proto.String(""),
},
Scheduler: scheduler,
})
if err != nil {
log.Fatal(fmt.Sprintf("Unable to create scheduler driver: %s", err))
}
// Checking if prefix contains any special characters.
if strings.Contains(*pcplogPrefix, "/") {
log.Fatal("log file prefix should not contain '/'.")
}
// Build Logger.
if err := elekLog.BuildLogger(*pcplogPrefix, *logConfigFilename); err != nil {
log.Fatal(err)
}
// Starting PCP logging.
if noPowercap {
go pcp.Start(pcpLog, &recordPCP, *pcpConfigFile)
} else if extrema {
go powerCap.StartPCPLogAndExtremaDynamicCap(pcpLog, &recordPCP, *hiThreshold,
*loThreshold, *pcpConfigFile)
} else if progExtrema {
go powerCap.StartPCPLogAndProgressiveExtremaCap(pcpLog, &recordPCP, *hiThreshold,
*loThreshold, *pcpConfigFile)
}
// Take a second between starting PCP log and continuing.
time.Sleep(1 * time.Second)
// Attempt to handle SIGINT to not leave pmdumptext running.
// Catch interrupt.
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
s := <-c
if s != os.Interrupt {
close(pcpLog)
return
}
log.Println("Received SIGINT... stopping")
close(done)
}()
go func() {
// Signals we have scheduled every task we have
select {
case <-shutdown:
//case <-time.After(shutdownTimeout):
}
// All tasks have finished
select {
case <-done:
close(pcpLog)
time.Sleep(5 * time.Second) //Wait for PCP to log a few more seconds
// Closing logging channels.
//case <-time.After(shutdownTimeout):
}
// Done shutting down
driver.Stop(false)
}()
// Starting the scheduler driver.
if status, err := driver.Run(); err != nil {
elekLog.WithFields(log.Fields{
"status": status.String(),
"error": err.Error(),
}).Log(CONSOLE, log.ErrorLevel, "Framework stopped ")
}
elekLog.Log(CONSOLE, log.InfoLevel, "Exiting...")
}