Skip to content

Commit

Permalink
Fix the bug related to asyncio.CancelledError error
Browse files Browse the repository at this point in the history
  • Loading branch information
yym68686 committed Aug 25, 2024
1 parent 2a42096 commit 0f410b4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 59 deletions.
3 changes: 2 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async def process_request(request: RequestModel, provider: Dict):
else:
return await fetch_response(app.state.client, url, headers, payload)

import asyncio
class ModelRequestHandler:
def __init__(self):
self.last_provider_index = -1
Expand Down Expand Up @@ -154,7 +155,7 @@ async def try_all_providers(self, request: RequestModel, providers: List[Dict],
try:
response = await process_request(request, provider)
return response
except (Exception, HTTPException) as e:
except (Exception, HTTPException, asyncio.CancelledError, httpx.ReadError) as e:
logger.error(f"Error with provider {provider['provider']}: {str(e)}")
if auto_retry:
continue
Expand Down
65 changes: 7 additions & 58 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,62 +73,7 @@ def ensure_string(item):
else:
return str(item)

# import asyncio
# async def error_handling_wrapper(generator, status_code=200):
# async def new_generator():
# try:
# yield ensure_string(first_item)
# async for item in generator:
# yield ensure_string(item)
# except (httpx.ReadError, asyncio.CancelledError) as e:
# logger.error(f"Network error in new_generator: {e}")
# raise HTTPException(status_code=503, detail=f"Stream interrupted: {str(e)}")
# except Exception as e:
# logger.exception(f"Error in new_generator: {e}")
# raise HTTPException(status_code=status_code, detail=f"Stream error: {str(e)}")

# try:
# first_item = await generator.__anext__()
# first_item_str = first_item
# if isinstance(first_item_str, (bytes, bytearray)):
# first_item_str = first_item_str.decode("utf-8")
# if isinstance(first_item_str, str):
# if first_item_str.startswith("data: "):
# first_item_str = first_item_str[6:]
# elif first_item_str.startswith("data:"):
# first_item_str = first_item_str[5:]
# if first_item_str.startswith("[DONE]"):
# logger.error("error_handling_wrapper [DONE]!")
# raise StopAsyncIteration
# try:
# first_item_str = json.loads(first_item_str)
# except json.JSONDecodeError:
# logger.error("error_handling_wrapper JSONDecodeError!" + repr(first_item_str))
# raise StopAsyncIteration
# if isinstance(first_item_str, dict) and 'error' in first_item_str:
# raise HTTPException(status_code=status_code, detail=f"{first_item_str}"[:300])

# wrapped_generator = new_generator()
# try:
# async for item in wrapped_generator:
# yield item
# except HTTPException as http_exc:
# raise HTTPException(status_code=status_code, detail=f"Wrapper error: {str(http_exc)}")
# except (httpx.ReadError, asyncio.CancelledError) as e:
# logger.error(f"Network error during streaming: {e}")
# raise HTTPException(status_code=503, detail=f"Stream interrupted: {str(e)}")
# except Exception as e:
# logger.exception(f"Unexpected error in error_handling_wrapper: {e}")
# raise HTTPException(status_code=status_code, detail=f"Unexpected error: {str(e)}")

# except StopAsyncIteration:
# raise HTTPException(status_code=status_code, detail="data: {'error': 'No data returned'}")
# except HTTPException as http_exc:
# raise HTTPException(status_code=status_code, detail=f"Wrapper error: {str(http_exc)}")
# except Exception as e:
# logger.exception(f"Error in error_handling_wrapper: {e}")
# raise HTTPException(status_code=status_code, detail=f"Wrapper error: {str(e)}")

import asyncio
async def error_handling_wrapper(generator, status_code=200):
try:
first_item = await generator.__anext__()
Expand All @@ -155,8 +100,12 @@ async def error_handling_wrapper(generator, status_code=200):
# 如果不是错误,创建一个新的生成器,首先yield第一个项,然后yield剩余的项
async def new_generator():
yield ensure_string(first_item)
async for item in generator:
yield ensure_string(item)
try:
async for item in generator:
yield ensure_string(item)
except (httpx.ReadError, asyncio.CancelledError) as e:
logger.error(f"Network error in new_generator: {e}")
raise

return new_generator()

Expand Down

0 comments on commit 0f410b4

Please sign in to comment.