Skip to content

Commit

Permalink
Merge pull request #19 from ittia-research/dev
Browse files Browse the repository at this point in the history
move pipelines to one single class, change to streaming search backend
  • Loading branch information
etwk authored Aug 31, 2024
2 parents 76f2b24 + b208fd0 commit 1b7e5cb
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 221 deletions.
2 changes: 2 additions & 0 deletions src/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .fetch import FetchUrl
from .search import SearchWeb
27 changes: 27 additions & 0 deletions src/api/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import httpx
import json
from tenacity import retry, stop_after_attempt, wait_fixed

import utils
from settings import settings

client = httpx.AsyncClient(http2=True, follow_redirects=True)

class FetchUrl():
"""Fetch one single url via API fetch endpoint"""

def __init__(self, url: str):
self.url = url
self.api = settings.SEARCH_BASE_URL + '/fetch'
self.timeout = 120 # api request timeout, set higher cause api backend might need to try a few times

@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.1), before_sleep=utils.retry_log_warning, reraise=True)
async def get(self):
_data = {
'url': self.url,
}
response = await client.post(self.api, json=_data, timeout=self.timeout)
_r = response.json()
if _r['status'] != 'ok':
raise Exception(f"Fetch url return status not ok: {self.url}")
return _r['data']
58 changes: 58 additions & 0 deletions src/api/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import asyncio
import httpx
import json
from tenacity import retry, stop_after_attempt, wait_fixed

import utils
from settings import settings

class SearchWeb():
"""
Web search with a query with session support:
- get more links following the previous searches
- get all links of this session
"""
def __init__(self, query: str):
self.query = query
self.api = settings.SEARCH_BASE_URL + '/search'
self.timeout = 600 # api request timeout, set higher cause search backend might need to try a few times

self.client = httpx.AsyncClient(http2=True, follow_redirects=True, timeout=self.timeout)
self.urls = [] # all urls got

"""
Get JSON data from API stream output.
TODO:
- Is there a more standard way to process streamed JSON?
"""
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.1), before_sleep=utils.retry_log_warning, reraise=True)
async def get(self, num: int = 10, all: bool = False):
_data = {
'query': self.query,
'num': num, # how many more urls to get
'all': all,
}
async with self.client.stream("POST", self.api, json=_data) as response:
buffer = ""
async for chunk in response.aiter_text():
if chunk.strip(): # Only process non-empty chunks
buffer += chunk

# Attempt to load the buffer as JSON
try:
# Keep loading JSON until all data is consumed
while buffer:
# Try to load a complete JSON object
rep, index = json.JSONDecoder().raw_decode(buffer)
_url = rep['url']
# deduplication
if _url not in self.urls: # TODO: waht if the new one containes same url but better metadata
self.urls.append(_url)
yield rep

# Remove the processed part from the buffer
buffer = buffer[index:].lstrip() # Remove processed JSON and any leading whitespace
except json.JSONDecodeError:
# If we encounter an error, we may not have a complete JSON object yet
continue # Continue to read more data
114 changes: 58 additions & 56 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,68 @@

app = FastAPI()

"""
Process input string, fact-check and output MARKDOWN
"""
async def fact_check(input):
status = 500
logger.info(f"Fact checking: {input}")

# get list of statements
try:
statements = await run_in_threadpool(pipeline.get_statements, input)
logger.info(f"statements: {statements}")
except Exception as e:
logger.error(f"Get statements failed: {e}")
raise HTTPException(status_code=status, detail="No statements found")

verdicts = []
fail_search = False
for statement in statements:
if not statement:
continue
logger.info(f"Statement: {statement}")

# get search query
try:
query = await run_in_threadpool(pipeline.get_search_query, statement)
logger.info(f"Search query: {query}")
except Exception as e:
logger.error(f"Getting search query from statement '{statement}' failed: {e}")
continue

# searching
try:
search = await Search(query)
logger.info(f"Head of search results: {json.dumps(search)[0:500]}")
except Exception as e:
fail_search = True
logger.error(f"Search '{query}' failed: {e}")
continue

# get verdict
try:
verdict = await run_in_threadpool(pipeline.get_verdict, search_json=search, statement=statement)
logger.info(f"Verdict: {verdict}")
except Exception as e:
logger.error(f"Getting verdict for statement '{statement}' failed: {e}")
continue
# """
# Process input string, fact-check and output MARKDOWN
# """
# async def fact_check(input):
# status = 500
# logger.info(f"Fact checking: {input}")

# # get list of statements
# try:
# statements = await run_in_threadpool(pipeline.get_statements, input)
# logger.info(f"statements: {statements}")
# except Exception as e:
# logger.error(f"Get statements failed: {e}")
# raise HTTPException(status_code=status, detail="No statements found")

# verdicts = []
# fail_search = False
# for statement in statements:
# if not statement:
# continue
# logger.info(f"Statement: {statement}")

# # get search query
# try:
# query = await run_in_threadpool(pipeline.get_search_query, statement)
# logger.info(f"Search query: {query}")
# except Exception as e:
# logger.error(f"Getting search query from statement '{statement}' failed: {e}")
# continue

# # searching
# try:
# search = await Search(query)
# logger.info(f"Head of search results: {json.dumps(search)[0:500]}")
# except Exception as e:
# fail_search = True
# logger.error(f"Search '{query}' failed: {e}")
# continue

# # get verdict
# try:
# verdict = await run_in_threadpool(pipeline.get_verdict, search_json=search, statement=statement)
# logger.info(f"Verdict: {verdict}")
# except Exception as e:
# logger.error(f"Getting verdict for statement '{statement}' failed: {e}")
# continue

verdicts.append(verdict)
# verdicts.append(verdict)

if not verdicts:
if fail_search:
raise HTTPException(status_code=status, detail="Search not available")
else:
raise HTTPException(status_code=status, detail="No verdicts found")
# if not verdicts:
# if fail_search:
# raise HTTPException(status_code=status, detail="Search not available")
# else:
# raise HTTPException(status_code=status, detail="No verdicts found")

report = utils.generate_report_markdown(input, verdicts)
return report
# report = utils.generate_report_markdown(input, verdicts)
# return report

# TODO: multi-stage response
async def stream_response(path):
task = asyncio.create_task(fact_check(path))
union = pipeline.Union(path)
task = asyncio.create_task(union.final())

# Stream response to prevent timeout, return multi-stage reponses
elapsed_time = 0
Expand Down Expand Up @@ -109,7 +110,8 @@ async def health():
async def status():
_status = utils.get_status()
return _status


# TODO: integrade error handle with output
@app.get("/{path:path}", response_class=PlainTextResponse)
async def catch_all(path: str, accept: str = Header(None)):
try:
Expand Down
5 changes: 3 additions & 2 deletions src/modules/retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ def __init__(
docs,
k: Optional[int] = None,
):
self.retriever = LlamaIndexCustomRetriever(docs=docs)

self.docs = docs
if k:
self.k = k

self.retriever = LlamaIndexCustomRetriever(docs=self.docs)

@property
def k(self) -> Optional[int]:
"""Get similarity top k of retriever."""
Expand Down
Loading

0 comments on commit 1b7e5cb

Please sign in to comment.