forked from kernelci/kernelci-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeout.py
executable file
·211 lines (169 loc) · 6.48 KB
/
timeout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python3
#
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (C) 2022 Collabora Limited
# Author: Jeny Sadadia <[email protected]>
import sys
from datetime import datetime
from time import sleep
import json
import requests
import kernelci
import kernelci.config
import kernelci.db
from kernelci.cli import Args, Command, parse_opts
from base import Service
class TimeoutService(Service):
def __init__(self, configs, args, name):
super().__init__(configs, args, name)
self._pending_states = [
state.value for state in self._api.node_states
if state != state.DONE
]
self._user = self._api.whoami()
self._username = self._user['profile']['username']
def _setup(self, args):
return {
'poll_period': args.poll_period,
}
def _get_pending_nodes(self, filters=None):
nodes = {}
node_filters = filters.copy() if filters else {}
for state in self._pending_states:
node_filters['state'] = state
for node in self._api.get_nodes(node_filters):
# Until permissions for the timeout service are fixed:
if node['owner'] == self._username:
nodes[node['id']] = node
return nodes
def _count_running_child_nodes(self, parent_id):
nodes_count = 0
for state in self._pending_states:
nodes_count += self._api.count_nodes({
'parent': parent_id, 'state': state
})
return nodes_count
def _get_child_nodes_recursive(self, node, state_filter=None):
recursive = {}
child_nodes = self._get_pending_nodes({'parent': node['id']})
for child_id, child in child_nodes.items():
if state_filter is None or child['state'] == state_filter:
recursive.update(self._get_child_nodes_recursive(
child, state_filter
))
return recursive
def _submit_lapsed_nodes(self, lapsed_nodes, state, log=None):
for node_id, node in lapsed_nodes.items():
node_update = node.copy()
node_update['state'] = state
if log:
self.log.debug(f"{node_id} {log}")
try:
self._api.update_node(node_update)
except requests.exceptions.HTTPError as err:
err_msg = json.loads(err.response.content).get("detail", [])
self.log.error(err_msg)
class Timeout(TimeoutService):
def __init__(self, configs, args):
super().__init__(configs, args, 'timeout')
def _check_pending_nodes(self, pending_nodes):
timeout_nodes = {}
for node_id, node in pending_nodes.items():
timeout_nodes[node_id] = node
timeout_nodes.update(self._get_child_nodes_recursive(node))
self._submit_lapsed_nodes(timeout_nodes, 'done', 'TIMEOUT')
def _run(self, ctx):
self.log.info("Looking for nodes with lapsed timeout...")
self.log.info("Press Ctrl-C to stop.")
self.log.info(f"Current user: {self._username}")
while True:
pending_nodes = self._get_pending_nodes({
'timeout__lt': datetime.isoformat(datetime.utcnow())
})
self._check_pending_nodes(pending_nodes)
sleep(ctx['poll_period'])
return True
class Holdoff(TimeoutService):
def __init__(self, configs, args):
super().__init__(configs, args, 'timeout-holdoff')
def _get_available_nodes(self):
nodes = self._api.get_nodes({
'state': 'available',
'holdoff__lt': datetime.isoformat(datetime.utcnow()),
})
return {node['id']: node for node in nodes}
def _check_available_nodes(self, available_nodes):
timeout_nodes = {}
closing_nodes = {}
for node_id, node in available_nodes.items():
running = self._count_running_child_nodes(node_id)
if running:
closing_nodes.update(
self._get_child_nodes_recursive(node, 'available')
)
closing_nodes[node_id] = node
else:
timeout_nodes.update(
self._get_child_nodes_recursive(node)
)
timeout_nodes[node_id] = node
self._submit_lapsed_nodes(closing_nodes, 'closing', 'HOLDOFF')
self._submit_lapsed_nodes(timeout_nodes, 'done', 'DONE')
def _run(self, ctx):
self.log.info("Looking for nodes with lapsed holdoff...")
self.log.info("Press Ctrl-C to stop.")
while True:
available_nodes = self._get_available_nodes()
self._check_available_nodes(available_nodes)
sleep(ctx['poll_period'])
return True
class Closing(TimeoutService):
def __init__(self, configs, args):
super().__init__(configs, args, 'timeout-closing')
def _get_closing_nodes(self):
nodes = self._api.get_nodes({'state': 'closing'})
return {node['id']: node for node in nodes}
def _check_closing_nodes(self, closing_nodes):
done_nodes = {}
for node_id, node in closing_nodes.items():
running = self._count_running_child_nodes(node_id)
self.log.debug(f"{node_id} RUNNING: {running}")
if not running:
done_nodes[node_id] = node
self._submit_lapsed_nodes(done_nodes, 'done', 'DONE')
def _run(self, ctx):
self.log.info("Looking for nodes that are done closing...")
self.log.info("Press Ctrl-C to stop.")
while True:
closing_nodes = self._get_closing_nodes()
self._check_closing_nodes(closing_nodes)
sleep(ctx['poll_period'])
return True
MODES = {
'timeout': Timeout,
'holdoff': Holdoff,
'closing': Closing,
}
class cmd_run(Command):
help = "Set node state to done if maximum wait time is over"
args = [
Args.api_config,
{
'name': '--poll-period',
'type': int,
'help': "Polling period in seconds",
'default': 60,
},
{
'name': '--mode',
'choices': MODES.keys(),
},
]
def __call__(self, configs, args):
return MODES[args.mode](configs, args).run(args)
if __name__ == '__main__':
opts = parse_opts('timeout', globals())
pipeline = kernelci.config.load('config/pipeline.yaml')
status = opts.command(pipeline, opts)
sys.exit(0 if status is True else 1)