From 4ee73b3612f04e11d715c918452f036dd5698478 Mon Sep 17 00:00:00 2001 From: provokateurin Date: Fri, 17 May 2024 11:53:29 +0200 Subject: [PATCH] feat: Add task processing API Signed-off-by: provokateurin --- appinfo/info.xml | 1 + lib/main.py | 60 ++++++++++++++++++++++-------------------------- 2 files changed, 28 insertions(+), 33 deletions(-) diff --git a/appinfo/info.xml b/appinfo/info.xml index dc002c8..553c246 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -27,6 +27,7 @@ AI_PROVIDERS + TASK_PROCESSING false diff --git a/lib/main.py b/lib/main.py index 408d4d2..d9af0df 100644 --- a/lib/main.py +++ b/lib/main.py @@ -3,6 +3,7 @@ import queue import threading +import time import typing from contextlib import asynccontextmanager from time import perf_counter @@ -13,7 +14,7 @@ from nc_py_api import AsyncNextcloudApp, NextcloudApp from nc_py_api.ex_app import LogLvl, anc_app, run_app, set_handlers -chains = generate_chains() +#chains = generate_chains() @asynccontextmanager async def lifespan(_app: FastAPI): @@ -27,19 +28,29 @@ async def lifespan(_app: FastAPI): APP = FastAPI(lifespan=lifespan) -TASK_LIST: queue.Queue = queue.Queue(maxsize=100) class BackgroundProcessTask(threading.Thread): def run(self, *args, **kwargs): # pylint: disable=unused-argument + nc = NextcloudApp() + while True: - task = TASK_LIST.get(block=True) + task = nc.providers.task_processing.next_task("core:text2text") + + print(task) + + # TODO: Load chain and execute task and report result + + time.sleep(5) + continue + try: chain_name = task.get("chain") print(f"chain: {chain_name}", flush=True) + """ chain_load = chains.get(chain_name) if chain_load is None: - NextcloudApp().providers.text_processing.report_result( + NextcloudApp().providers.task_processing.report_result( task["id"], error="Requested model is not available" ) continue @@ -49,8 +60,10 @@ def run(self, *args, **kwargs): # pylint: disable=unused-argument result = chain.invoke(task.get("prompt")).get("text") del chain print(f"reply generated: {perf_counter() - time_start}s", flush=True) + """ + result = "test" print(result, flush=True) - NextcloudApp().providers.text_processing.report_result( + NextcloudApp().providers.task_processing.report_result( task["id"], str(result).split(sep="<|assistant|>", maxsplit=1)[-1].strip(), ) @@ -58,39 +71,20 @@ def run(self, *args, **kwargs): # pylint: disable=unused-argument print(str(e), flush=True) nc = NextcloudApp() nc.log(LogLvl.ERROR, str(e)) - nc.providers.text_processing.report_result(task["id"], error=str(e)) - - -class Input(pydantic.BaseModel): - prompt: str - task_id: int - - -@APP.post("/chain/{chain_name}") -async def tiny_llama( - _nc: typing.Annotated[AsyncNextcloudApp, Depends(anc_app)], - req: Input, - chain_name=None, -): - try: - TASK_LIST.put({"prompt": req.prompt, "id": req.task_id, "chain": chain_name}, block=False) - except queue.Full: - return responses.JSONResponse(content={"error": "task queue is full"}, status_code=429) - return responses.Response() - + nc.providers.task_processing.report_result(task["id"], error=str(e)) async def enabled_handler(enabled: bool, nc: AsyncNextcloudApp) -> str: print(f"enabled={enabled}", flush=True) if enabled is True: - for chain_name, _ in chains.items(): - (model, task) = chain_name.split(":", 2) - await nc.providers.text_processing.register( - "llm2:"+chain_name, "Local Large language Model: " + model, "/chain/" + chain_name, task - ) + #for chain_name, _ in chains.items(): + # (model, task) = chain_name.split(":", 2) + await nc.providers.task_processing.register( + "llm2:"+"test", "Local Large language Model: " + "test", "core:text2text" + ) else: - for chain_name, chain in chains.items(): - (model, task) = chain_name.split(":", 2) - await nc.providers.text_processing.unregister(model) + #for chain_name, chain in chains.items(): + # (model, task) = chain_name.split(":", 2) + await nc.providers.task_processing.unregister("test") return ""