-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry.config
89 lines (84 loc) · 3.46 KB
/
retry.config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
retry {
/*
* Function for updating resource allocation based on retry number
* Args:
* base_val: the default of initial value to allocate
* operation: mathematical operation for updating value (add, subtract, exponential)
* operand: the numeric value used to perform the specified `operation`
* attempt: task attempt number, initially starts as 1 for the first run
* type: type of resource for update (cpus, memory, time)
*/
retry_updater = { base_val, String operation, operand, int attempt, String type ->
if ( operation == 'add' ) {
return methods.check_limits( base_val + ( ( attempt - 1 ) * operand ), type )
} else if ( operation == 'exponential' ) {
return methods.check_limits( base_val * Math.pow( operand, attempt - 1 ), type )
} else if ( operation == 'subtract' ) {
return methods.check_limits( base_val - ( ( attempt - 1 ) * operand ), type )
} else {
println " ### WARNING ### Unrecognized operation '${operation}'! Using base value: $base_val "
return base_val
}
}
/*
* Function for storing resource update values in params with process name as key
*/
cache_retry_data = {
params.proc_resource_params = [:]
def proc_params = [:]
for (i in process) {
if (i.key.startsWith('withName:')) {
def proc_names = i.key.split('withName:')[1].split("\\|")
for (proc_name in proc_names) {
proc_params = [:]
for (j in i.value) {
proc_params[j.key] = j.value
}
params.proc_resource_params[proc_name] = proc_params
}
}
}
}
/*
* Function for checking if retry strategy for a process is defined and generating the specific updater
* Looking for a retry_strategy namespace for given process:
* retry_strategy {
* type {
* strategy = strategy type
* operand = operand for stratrgy
* }
* }
*/
set_retry = { String proc_name, String type ->
if (process[proc_name]?.retry_strategy?[type] && \
process[proc_name]?[type] && \
process[proc_name]?.retry_strategy?[type]?.strategy && \
process[proc_name]?.retry_strategy?[type]?.operand) {
process[proc_name][type] = { retry.retry_updater(params.proc_resource_params[task.process.split(':')[-1]][type], \
params.proc_resource_params[task.process.split(':')[-1]].retry_strategy[type].strategy, \
params.proc_resource_params[task.process.split(':')[-1]].retry_strategy[type].operand, \
task.attempt, \
type) }
}
}
/*
* Function for iterating over process resouce definitions and generating retry updaters as necessary
*/
generate_retry_updater = {
def proc_name_keys = process.keySet().grep(~/^withName:.*/)
for (i in proc_name_keys) {
retry.set_retry(i, 'cpus')
retry.set_retry(i, 'memory')
if (process[i]?.retry_strategy) {
process[i].remove('retry_strategy')
}
}
}
/*
* Entry point for generating the retry updaters
*/
setup_retry = {
retry.cache_retry_data()
retry.generate_retry_updater()
}
}