From 55a42476c22b1943ac80089dc0826bfd7dcd3aff Mon Sep 17 00:00:00 2001 From: yym68686 Date: Sun, 25 Aug 2024 20:46:00 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Bug:=201.=20Fix=20the=20bug=20of?= =?UTF-8?q?=20httpx=20reading=20streamed=20output=20timeout.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2. Increase read response timeout. --- main.py | 4 ++-- utils.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index fb65dc1..c700540 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,7 @@ @asynccontextmanager async def lifespan(app: FastAPI): # 启动时的代码 - timeout = httpx.Timeout(connect=15.0, read=10.0, write=30.0, pool=30.0) + timeout = httpx.Timeout(connect=15.0, read=20.0, write=30.0, pool=30.0) app.state.client = httpx.AsyncClient(timeout=timeout) app.state.config, app.state.api_keys_db, app.state.api_list = await load_config(app) yield @@ -74,7 +74,7 @@ async def process_request(request: RequestModel, provider: Dict): if request.stream: model = provider['model'][request.model] generator = fetch_response_stream(app.state.client, url, headers, payload, engine, model) - wrapped_generator = await error_handling_wrapper(generator, status_code=500) + wrapped_generator = error_handling_wrapper(generator, status_code=500) return StreamingResponse(wrapped_generator, media_type="text/event-stream") else: return await fetch_response(app.state.client, url, headers, payload) diff --git a/utils.py b/utils.py index b9dd423..7c22e93 100644 --- a/utils.py +++ b/utils.py @@ -73,11 +73,16 @@ def ensure_string(item): else: return str(item) -async def async_generator(items): - for item in items: - yield item - async def error_handling_wrapper(generator, status_code=200): + async def new_generator(): + try: + yield ensure_string(first_item) + async for item in generator: + yield ensure_string(item) + except Exception as e: + logger.exception(f"Error in new_generator: {e}") + raise HTTPException(status_code=status_code, detail=f"Stream error: {str(e)}") + try: first_item = await generator.__anext__() first_item_str = first_item @@ -97,19 +102,60 @@ async def error_handling_wrapper(generator, status_code=200): logger.error("error_handling_wrapper JSONDecodeError!" + repr(first_item_str)) raise StopAsyncIteration if isinstance(first_item_str, dict) and 'error' in first_item_str: - # 如果第一个 yield 的项是错误信息,抛出 HTTPException raise HTTPException(status_code=status_code, detail=f"{first_item_str}"[:300]) - # 如果不是错误,创建一个新的生成器,首先yield第一个项,然后yield剩余的项 - async def new_generator(): - yield ensure_string(first_item) - async for item in generator: - yield ensure_string(item) - - return new_generator() + # 创建新的生成器并包装在 try-except 块中 + wrapped_generator = new_generator() + try: + async for item in wrapped_generator: + yield item + except HTTPException as http_exc: + raise http_exc + except Exception as e: + logger.exception(f"Unexpected error in error_handling_wrapper: {e}") + raise HTTPException(status_code=status_code, detail=f"Unexpected error: {str(e)}") except StopAsyncIteration: raise HTTPException(status_code=status_code, detail="data: {'error': 'No data returned'}") + except HTTPException as http_exc: + raise http_exc + except Exception as e: + logger.exception(f"Error in error_handling_wrapper: {e}") + raise HTTPException(status_code=status_code, detail=f"Wrapper error: {str(e)}") + +# async def error_handling_wrapper(generator, status_code=200): +# try: +# first_item = await generator.__anext__() +# first_item_str = first_item +# if isinstance(first_item_str, (bytes, bytearray)): +# first_item_str = first_item_str.decode("utf-8") +# if isinstance(first_item_str, str): +# if first_item_str.startswith("data: "): +# first_item_str = first_item_str[6:] +# elif first_item_str.startswith("data:"): +# first_item_str = first_item_str[5:] +# if first_item_str.startswith("[DONE]"): +# logger.error("error_handling_wrapper [DONE]!") +# raise StopAsyncIteration +# try: +# first_item_str = json.loads(first_item_str) +# except json.JSONDecodeError: +# logger.error("error_handling_wrapper JSONDecodeError!" + repr(first_item_str)) +# raise StopAsyncIteration +# if isinstance(first_item_str, dict) and 'error' in first_item_str: +# # 如果第一个 yield 的项是错误信息,抛出 HTTPException +# raise HTTPException(status_code=status_code, detail=f"{first_item_str}"[:300]) + +# # 如果不是错误,创建一个新的生成器,首先yield第一个项,然后yield剩余的项 +# async def new_generator(): +# yield ensure_string(first_item) +# async for item in generator: +# yield ensure_string(item) + +# return new_generator() + +# except StopAsyncIteration: +# raise HTTPException(status_code=status_code, detail="data: {'error': 'No data returned'}") def post_all_models(token, config, api_list): all_models = []