Skip to content

Commit

Permalink
[fix] Resolve issue with new runs after tracking queue shutdown (#3134)
Browse files Browse the repository at this point in the history
  • Loading branch information
mihran113 authored Apr 17, 2024
1 parent b172492 commit 56b915a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## 3.19.3
## 3.19.3
- Resolve issue with new runs after tracking queue shutdown (mihran113)
- Reset base path when opening new tabs (mihran113)

## 3.19.2 Mar 22, 2024
Expand Down
15 changes: 6 additions & 9 deletions aim/ext/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@
class Client:
_thread_local = threading.local()

_queue = RequestQueue(
'remote_tracker',
max_queue_memory=os.getenv(AIM_CLIENT_QUEUE_MAX_MEMORY, 1024 * 1024 * 1024),
retry_count=DEFAULT_RETRY_COUNT,
retry_interval=DEFAULT_RETRY_INTERVAL
)

def __init__(self, remote_path: str):
# temporary workaround for M1 build

self._id = str(uuid.uuid4())
if remote_path.endswith('/'):
remote_path = remote_path[:-1]
Expand All @@ -58,6 +49,12 @@ def __init__(self, remote_path: str):
self._tracking_endpoint = f'{self.remote_path}/tracking'
self.connect()

self._queue = RequestQueue(
f'remote_tracker_{self._id}',
max_queue_memory=os.getenv(AIM_CLIENT_QUEUE_MAX_MEMORY, 1024 * 1024 * 1024),
retry_count=DEFAULT_RETRY_COUNT,
retry_interval=DEFAULT_RETRY_INTERVAL
)
self._heartbeat_sender = HeartbeatSender(self)
self._heartbeat_sender.start()
self._thread_local.atomic_instructions = {}
Expand Down
4 changes: 2 additions & 2 deletions aim/ext/transport/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def register_task(self, client, task_f, *args):
self._client = weakref.ref(client)

if self._shutdown:
logger.debug('Cannot register task: rpc task queue is stopped.')
logger.debug('Cannot register task: task queue is stopped.')
return

arg_size = self._calculate_size(args)
Expand Down Expand Up @@ -100,7 +100,7 @@ def wait_for_finish(self):
def stop(self):
pending_task_count = self._queue.qsize()
if pending_task_count:
logger.warning(f'Processing {pending_task_count} pending tasks in the rpc queue \'{self._name}\'... '
logger.warning(f'Processing {pending_task_count} pending tasks in the task queue \'{self._name}\'... '
f'Please do not kill the process.')
self._queue.join()
logger.debug('No pending tasks left.')
Expand Down

0 comments on commit 56b915a

Please sign in to comment.