From 491201ac2d224985ebf33cea20ae3a3e6cfb2568 Mon Sep 17 00:00:00 2001 From: Anupam Kumar Date: Wed, 15 Jan 2025 13:45:58 +0530 Subject: [PATCH] fix: proper handling of index locks (#133) Signed-off-by: Anupam Kumar --- context_chat_backend/controller.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/context_chat_backend/controller.py b/context_chat_backend/controller.py index 1399c97..746c4a7 100644 --- a/context_chat_backend/controller.py +++ b/context_chat_backend/controller.py @@ -303,11 +303,10 @@ def _(sources: list[UploadFile]): if source.filename in _indexing: # this request will be retried by the client return JSONResponse( - f'Source already being processed: {source.filename}', + f'This source ({source.filename}) is already being processed in another request, try again later', 503, headers={'cc-retry': 'true'}, ) - _indexing[source.filename] = True if not ( value_of(source.headers.get('userIds')) @@ -317,22 +316,29 @@ def _(sources: list[UploadFile]): and source.headers['modified'].isdigit() and value_of(source.headers.get('provider')) ): + log_error('Invalid/missing headers received', source.filename, source.headers) return JSONResponse(f'Invaild/missing headers for: {source.filename}', 400) # wait for 10 minutes before failing the request semres = doc_parse_semaphore.acquire(block=True, timeout=10*60) if not semres: return JSONResponse( - 'Document parser worker limit reached, try again in some time', + 'Document parser worker limit reached, try again in some time or consider increasing the limit', 503, headers={'cc-retry': 'true'} ) - added_sources = exec_in_proc(target=embed_sources, args=(vectordb_loader, app.extra['CONFIG'], sources)) - doc_parse_semaphore.release() - for source in sources: + with index_lock: + for source in sources: + _indexing[source.filename] = True + + try: + added_sources = exec_in_proc(target=embed_sources, args=(vectordb_loader, app.extra['CONFIG'], sources)) + finally: with index_lock: - _indexing.pop(source.filename) + for source in sources: + _indexing.pop(source.filename, None) + doc_parse_semaphore.release() if len(added_sources) != len(sources): print(