From 56b915ab0f7d19554e312dac998b5a840757b998 Mon Sep 17 00:00:00 2001 From: mihran113 Date: Wed, 17 Apr 2024 14:54:38 +0400 Subject: [PATCH] [fix] Resolve issue with new runs after tracking queue shutdown (#3134) --- CHANGELOG.md | 3 ++- aim/ext/transport/client.py | 15 ++++++--------- aim/ext/transport/request_queue.py | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e219571bf..f9c26f9df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog -## 3.19.3 +## 3.19.3 +- Resolve issue with new runs after tracking queue shutdown (mihran113) - Reset base path when opening new tabs (mihran113) ## 3.19.2 Mar 22, 2024 diff --git a/aim/ext/transport/client.py b/aim/ext/transport/client.py index b5703c354..e8f086052 100644 --- a/aim/ext/transport/client.py +++ b/aim/ext/transport/client.py @@ -32,16 +32,7 @@ class Client: _thread_local = threading.local() - _queue = RequestQueue( - 'remote_tracker', - max_queue_memory=os.getenv(AIM_CLIENT_QUEUE_MAX_MEMORY, 1024 * 1024 * 1024), - retry_count=DEFAULT_RETRY_COUNT, - retry_interval=DEFAULT_RETRY_INTERVAL - ) - def __init__(self, remote_path: str): - # temporary workaround for M1 build - self._id = str(uuid.uuid4()) if remote_path.endswith('/'): remote_path = remote_path[:-1] @@ -58,6 +49,12 @@ def __init__(self, remote_path: str): self._tracking_endpoint = f'{self.remote_path}/tracking' self.connect() + self._queue = RequestQueue( + f'remote_tracker_{self._id}', + max_queue_memory=os.getenv(AIM_CLIENT_QUEUE_MAX_MEMORY, 1024 * 1024 * 1024), + retry_count=DEFAULT_RETRY_COUNT, + retry_interval=DEFAULT_RETRY_INTERVAL + ) self._heartbeat_sender = HeartbeatSender(self) self._heartbeat_sender.start() self._thread_local.atomic_instructions = {} diff --git a/aim/ext/transport/request_queue.py b/aim/ext/transport/request_queue.py index 36142aaa8..18e009ae6 100644 --- a/aim/ext/transport/request_queue.py +++ b/aim/ext/transport/request_queue.py @@ -33,7 +33,7 @@ def register_task(self, client, task_f, *args): self._client = weakref.ref(client) if self._shutdown: - logger.debug('Cannot register task: rpc task queue is stopped.') + logger.debug('Cannot register task: task queue is stopped.') return arg_size = self._calculate_size(args) @@ -100,7 +100,7 @@ def wait_for_finish(self): def stop(self): pending_task_count = self._queue.qsize() if pending_task_count: - logger.warning(f'Processing {pending_task_count} pending tasks in the rpc queue \'{self._name}\'... ' + logger.warning(f'Processing {pending_task_count} pending tasks in the task queue \'{self._name}\'... ' f'Please do not kill the process.') self._queue.join() logger.debug('No pending tasks left.')