Skip to content

Commit 4f6a805

Browse files
authored
Merge pull request #1579 from jluebbe/avoid-loop-blocking
improve handing of background tasks in the coordinator
2 parents fc33503 + aea1add commit 4f6a805

File tree

1 file changed

+51
-27
lines changed

1 file changed

+51
-27
lines changed

labgrid/remote/coordinator.py

+51-27
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import traceback
66
from enum import Enum
77
from functools import wraps
8+
import time
9+
from contextlib import contextmanager
10+
import copy
811

912
import attr
1013
import grpc
@@ -26,6 +29,19 @@
2629
from ..util import atomic_replace, labgrid_version, yaml
2730

2831

32+
@contextmanager
33+
def warn_if_slow(prefix, *, level=logging.WARNING, limit=0.1):
34+
monotonic = time.monotonic()
35+
process = time.process_time()
36+
thread = time.thread_time()
37+
yield
38+
monotonic = time.monotonic() - monotonic
39+
process = time.process_time() - process
40+
thread = time.thread_time() - thread
41+
if monotonic > limit:
42+
logging.log(level, "%s: real %.3f>%.3f, process %.3f, thread %.3f", prefix, monotonic, limit, process, thread)
43+
44+
2945
class Action(Enum):
3046
ADD = 0
3147
DEL = 1
@@ -195,7 +211,7 @@ class Coordinator(labgrid_coordinator_pb2_grpc.CoordinatorServicer):
195211
def __init__(self) -> None:
196212
self.places: dict[str, Place] = {}
197213
self.reservations = {}
198-
self.poll_task = None
214+
self.poll_tasks = []
199215
self.save_scheduled = False
200216

201217
self.lock = asyncio.Lock()
@@ -204,32 +220,33 @@ def __init__(self) -> None:
204220
self.load()
205221

206222
self.loop = asyncio.get_running_loop()
207-
self.poll_task = self.loop.create_task(self.poll(), name="coordinator-poll")
223+
for name in ["save", "reacquire", "schedule"]:
224+
step_func = getattr(self, f"_poll_step_{name}")
225+
task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}")
226+
self.poll_tasks.append(task)
208227

209-
async def _poll_step(self):
228+
async def _poll_step_save(self):
210229
# save changes
211-
try:
212-
if self.save_scheduled:
230+
if self.save_scheduled:
231+
with warn_if_slow("save changes", level=logging.DEBUG):
213232
await self.save()
214-
except Exception: # pylint: disable=broad-except
215-
traceback.print_exc()
233+
234+
async def _poll_step_reacquire(self):
216235
# try to re-acquire orphaned resources
217-
try:
218-
async with self.lock:
236+
async with self.lock:
237+
with warn_if_slow("reacquire orphaned resources", limit=3.0):
219238
await self._reacquire_orphaned_resources()
220-
except Exception: # pylint: disable=broad-except
221-
traceback.print_exc()
239+
240+
async def _poll_step_schedule(self):
222241
# update reservations
223-
try:
242+
with warn_if_slow("schedule reservations"):
224243
self.schedule_reservations()
225-
except Exception: # pylint: disable=broad-except
226-
traceback.print_exc()
227244

228-
async def poll(self):
245+
async def poll(self, step_func):
229246
while not self.loop.is_closed():
230247
try:
231248
await asyncio.sleep(15.0)
232-
await self._poll_step()
249+
await step_func()
233250
except asyncio.CancelledError:
234251
break
235252
except Exception: # pylint: disable=broad-except
@@ -249,17 +266,24 @@ async def save(self):
249266
logging.debug("Running Save")
250267
self.save_scheduled = False
251268

252-
resources = self._get_resources()
253-
resources = yaml.dump(resources)
254-
resources = resources.encode()
255-
places = self._get_places()
256-
places = yaml.dump(places)
257-
places = places.encode()
258-
259-
logging.debug("Awaiting resources")
260-
await self.loop.run_in_executor(None, atomic_replace, "resources.yaml", resources)
261-
logging.debug("Awaiting places")
262-
await self.loop.run_in_executor(None, atomic_replace, "places.yaml", places)
269+
with warn_if_slow("create resources snapshot", level=logging.DEBUG):
270+
resources = copy.deepcopy(self._get_resources())
271+
with warn_if_slow("create places snapshot", level=logging.DEBUG):
272+
places = copy.deepcopy(self._get_places())
273+
274+
def save_sync(resources, places):
275+
with warn_if_slow("dump resources", level=logging.DEBUG):
276+
resources = yaml.dump(resources)
277+
resources = resources.encode()
278+
with warn_if_slow("dump places", level=logging.DEBUG):
279+
places = yaml.dump(places)
280+
places = places.encode()
281+
with warn_if_slow("write resources", level=logging.DEBUG):
282+
atomic_replace("resources.yaml", resources)
283+
with warn_if_slow("write places", level=logging.DEBUG):
284+
atomic_replace("places.yaml", places)
285+
286+
await self.loop.run_in_executor(None, save_sync, resources, places)
263287

264288
def load(self):
265289
try:

0 commit comments

Comments
 (0)