Skip to content

Commit

Permalink
Merge pull request #18 from ittia-research/dev
Browse files Browse the repository at this point in the history
change API response to stream
  • Loading branch information
etwk authored Aug 29, 2024
2 parents 724950c + a0b7d70 commit 76f2b24
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 36 deletions.
47 changes: 35 additions & 12 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import asyncio
import json
from fastapi import FastAPI, HTTPException, Request
from fastapi import FastAPI, HTTPException, Request, Header
from fastapi.concurrency import run_in_threadpool
from fastapi.responses import Response, JSONResponse, HTMLResponse, PlainTextResponse, FileResponse
from fastapi.responses import Response, JSONResponse, HTMLResponse, PlainTextResponse, FileResponse, StreamingResponse
import logging

import utils, pipeline
import pipeline, utils, web
from modules import Search
from settings import settings

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

app = FastAPI()

"""
Process input string, fact-check and output MARKDOWN
"""
Expand Down Expand Up @@ -71,34 +75,53 @@ async def fact_check(input):
report = utils.generate_report_markdown(input, verdicts)
return report

app = FastAPI()

# TODO: multi-stage response
async def stream_response(path):
task = asyncio.create_task(fact_check(path))

# Stream response to prevent timeout, return multi-stage reponses
elapsed_time = 0
_check_interval = 0.2
while not task.done():
if elapsed_time > settings.STREAM_TIME_OUT: # waitting timeout
raise Exception(f"Waitting fact check results reached time limit: {settings.STREAM_TIME_OUT} seconds")
if elapsed_time % 30 == 0: # return wait messages from time to time
yield utils.get_stream(stage='processing', content='### Processing ...')
await asyncio.sleep(_check_interval)
elapsed_time = round(elapsed_time + _check_interval, 1)

result = await task
yield utils.get_stream(stage='final', content=result)

@app.on_event("startup")
async def startup_event():
pass

@app.get("/robots.txt", response_class=FileResponse)
async def robots():
return "assets/robots.txt"
return "web/robots.txt"

@app.get("/health")
async def health():
return {"status": "ok"}

@app.get("/status")
async def status():
_status = await utils.get_status()
_status = utils.get_status()
return _status

@app.get("/{path:path}", response_class=PlainTextResponse)
async def catch_all(path: str):
async def catch_all(path: str, accept: str = Header(None)):
try:
if not path:
return await utils.get_homepage()
if not utils.check_input(path):
return HTMLResponse(status_code=404, content="Invalid request") # filter brower background requests
result = await fact_check(path)
return result # HTMLResponse(content=result)

if accept == "text/markdown":
if not path:
return utils.get_stream(stage='final', content=web.get_homepage())
return StreamingResponse(stream_response(path), media_type="text/event-stream")
else:
return HTMLResponse(content=web.html_browser)
except HTTPException as e:
raise e
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_verdict_summary(verdicts_data, statement):
v = verdict['verdict'].lower()
if v in sum_citation:
weight_valid += 1
citation = f"{verdict['citation']}\nsource: {hostname}\n\n"
citation = f"{verdict['citation']} *source: {hostname}*\n\n"
sum_citation[v]['citation'].append(citation)
sum_citation[v]['weight'] += 1
if v == 'true':
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/verdict_citation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# loading compiled ContextVerdict
optimizer_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f"../optimizers/{settings.OPTIMIZER_FILE_NAME}")
context_verdict = ContextVerdict() # IMPORTANT: needs to initiate before load
context_verdict = ContextVerdict()
context_verdict.load(optimizer_path)

"""
Expand Down
3 changes: 3 additions & 0 deletions src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,8 @@ def __init__(self):

# concurrency
self.CONCURRENCY_VERDICT = os.environ.get("CONCURRENCY_VERDICT") or 8

# web
self.STREAM_TIME_OUT = os.environ.get("STREAM_TIME_OUT") or 300 # in seconds

settings = Settings()
31 changes: 9 additions & 22 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,7 @@ def check_input(input):

return True

async def get_homepage():
# get tech stack
stack = await get_stack()
md = f"## Tech stack\n"
lines = [md]
lines.extend([f"{key}: {value}" for key, value in stack.items()])
md = "\n".join(lines)

md = f"""Fact-check API
[Usage] {settings.PROJECT_HOSTING_BASE_URL}/YOUR_FACT_CHECK_QUERY
[Source] https://github.com/ittia-research/check
{md}
"""
return md

async def get_stack():
def get_stack():
# current tech stack
stack = {
"LLM model": settings.LLM_MODEL_NAME,
Expand All @@ -106,8 +88,8 @@ async def get_stack():
}
return stack

async def get_status():
stack = await get_stack()
def get_status():
stack = get_stack()
status = {
"stack": stack
}
Expand Down Expand Up @@ -168,4 +150,9 @@ def retry_log_warning(retry_state):
def get_md5(input):
"""Get MD5 from string"""
md5_hash = hashlib.md5(input.encode())
return md5_hash.hexdigest()
return md5_hash.hexdigest()

# generate str for stream
def get_stream(stage: str = 'wait', content = None):
message = {"stage": stage, "content": content}
return json.dumps(message)
2 changes: 2 additions & 0 deletions src/web/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .homepage import get_homepage
from .html import html_browser
20 changes: 20 additions & 0 deletions src/web/homepage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import utils
from settings import settings

def get_homepage():
# get tech stack
stack = utils.get_stack()
md = f"## Tech stack\n"
lines = [md]
lines.extend([f"**{key}**: {value}" for key, value in stack.items()])
md = "\n\n".join(lines)

md = f"""# Fact-check API
**Usage**: {settings.PROJECT_HOSTING_BASE_URL}/YOUR_FACT_CHECK_QUERY
**Source**: https://github.com/ittia-research/check
{md}
"""
return md
55 changes: 55 additions & 0 deletions src/web/html.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
The return of requests from browser.
It will fetch the same url with markdown header and process content in the streams as markdown.
If `stage` in the stream in `wait`, this stream will be skipped.
Purpose of this setup:
- Render multiple stages as markdown.
- Avoid CloudFlare 524 error when use CloudFlare as CDN.
"""
html_browser = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Fact-check</title>
</head>
<body>
<div id="content"></div>
<script src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"></script>
<script>
async function fetchData() {
const response = await fetch(window.location.href, {
headers: {
"Accept": "text/markdown"
}
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
let part = decoder.decode(value, { stream: true });
console.log(part)
try {
const message = JSON.parse(part);
if (message.stage != "wait") {
document.getElementById('content').innerHTML = marked.parse(message.content);
}
} catch (e) {
console.error("Error parsing JSON:", e);
}
}
}
fetchData();
</script>
</body>
</html>
"""
File renamed without changes.

0 comments on commit 76f2b24

Please sign in to comment.