Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combine dataset and flow search #42

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from modules.rag_llm import *
from modules.utils import *
from tenacity import retry, retry_if_exception_type, stop_after_attempt
from pathlib import Path

app = FastAPI()
# Config and DB
Expand All @@ -18,7 +19,12 @@
# load the persistent database using ChromaDB
print('Loading DB')
client = chromadb.PersistentClient(path=config["persist_dir"])

# Loading the metadata for all types
data_metadata_path = Path(config["data_dir"]) / "all_dataset_description.csv"
flow_metadata_path = Path(config["data_dir"]) / "all_flow_description.csv"
data_metadata = pd.read_csv(data_metadata_path)
flow_metadata = pd.read_csv(flow_metadata_path)

# Setup llm chain, initialize the retriever and llm, and setup Retrieval QA
print('Setting LLM chain')
Expand Down Expand Up @@ -64,15 +70,17 @@
async def read_dataset(query: str):
try:
# Fetch the result data frame based on the query
_, ids_order = QueryProcessor(
result = QueryProcessor(
query=query,
qa=qa_dataset,
type_of_query='dataset',
config=config,
dataset_meta=data_metadata,
flow_meta=flow_metadata,
).get_result_from_query()

response = JSONResponse(
content={"initial_response": ids_order}, status_code=200
content={"initial_response": result}, status_code=200
)

return response
Expand All @@ -81,21 +89,21 @@ async def read_dataset(query: str):
return JSONResponse(content={"error": str(e)}, status_code=500)


@app.get("/flow/{query}", response_class=JSONResponse)
@retry(retry=retry_if_exception_type(ConnectTimeout), stop=stop_after_attempt(2))
async def read_flow(query: str):
try:
_, ids_order = QueryProcessor(
query=query,
qa=qa_flow,
type_of_query='flow',
config=config,
).get_result_from_query()

response = JSONResponse(
content={"initial_response": ids_order}, status_code=200
)

return response
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# @app.get("/flow/{query}", response_class=JSONResponse)
# @retry(retry=retry_if_exception_type(ConnectTimeout), stop=stop_after_attempt(2))
# async def read_flow(query: str):
# try:
# _, ids_order = QueryProcessor(
# query=query,
# qa=qa_flow,
# type_of_query='flow',
# config=config,
# ).get_result_from_query()
#
# response = JSONResponse(
# content={"initial_response": ids_order}, status_code=200
# )
#
# return response
# except Exception as e:
# return JSONResponse(content={"error": str(e)}, status_code=500)
2 changes: 1 addition & 1 deletion backend/modules/rag_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def setup_vector_db_and_qa(self):
)

vector_store_manager = VectorStoreManager(self.client, self.config)
vectordb = vector_store_manager.create_vector_store(metadata_df)
vectordb = vector_store_manager.create_vector_store(metadata_df,self.data_type)
qa = LLMChainInitializer.initialize_llm_chain(vectordb, self.config)

return qa, all_metadata
Expand Down
50 changes: 39 additions & 11 deletions backend/modules/results_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from langchain_core.documents import BaseDocumentTransformer, Document
from tqdm import tqdm

from structured_query.structuring_query import filter_attribute_info


# --- PROCESSING RESULTS ---


Expand All @@ -34,11 +37,13 @@ def long_context_reorder(results):


class QueryProcessor:
def __init__(self, query: str, qa: RetrievalQA, type_of_query: str, config: dict):
def __init__(self, query: str, qa: RetrievalQA, type_of_query: str, dataset_meta, flow_meta, config: dict):
self.query = query
self.qa = qa
self.type_of_query = type_of_query
self.config = config
self.dataset_meta = dataset_meta
self.flow_meta = flow_meta

def fetch_results(self):
"""
Expand Down Expand Up @@ -81,18 +86,15 @@ def fetch_results(self):

@staticmethod
def process_documents(
source_documents: Sequence[Document],
source_documents: Sequence[Document],
) -> Tuple[OrderedDict, list]:
"""
Process the source documents and create a dictionary with the key_name as the key and the name and page content as the values.
"""
dict_results = OrderedDict()
for result in source_documents:
dict_results[result.metadata["did"]] = {
"name": result.metadata["name"],
"page_content": result.page_content,
}
ids = [result.metadata["did"] for result in source_documents]
dict_results[result.metadata["did"]] = result
ids = [(result.metadata["did"], result.metadata['type']) for result in source_documents]
return dict_results, ids

@staticmethod
Expand All @@ -103,7 +105,7 @@ def make_clickable(val: str) -> str:
return '<a href="{}">{}</a>'.format(val, val)

def create_output_dataframe(
self, dict_results: dict, type_of_data: str, ids_order: list
self, dict_results: dict, type_of_data: str, ids_order: list
) -> pd.DataFrame:
"""
Create an output dataframe with the results. The URLs are API calls to the OpenML API for the specific type of data.
Expand Down Expand Up @@ -151,6 +153,31 @@ def check_query(query: str) -> str:
query = query[:200]
return query

def meta_synthesize(self, ids_order):
# Initialize an empty list to store the synthesized rows
synthesized_rows = []

# Iterate over ids_order and append the corresponding row from dataset_meta or flow_meta
for did, dtype in ids_order:
if dtype == 'dataset':
row = self.dataset_meta[self.dataset_meta['did'] == did].copy()
if not row.empty:
row.loc[:, 'type'] = 'dataset'
synthesized_rows.append(row.iloc[0])
elif dtype == 'flow':
row = self.flow_meta[self.flow_meta['did'] == did].copy()
if not row.empty:
row.loc[:, 'type'] = 'flow'
synthesized_rows.append(row.iloc[0])

# Convert the list of rows to a DataFrame
synthesized_df = pd.DataFrame(synthesized_rows).reset_index(drop=True)
# Reorder the columns to place 'type' as the second column
cols = synthesized_df.columns.tolist()
cols.insert(1, cols.pop(cols.index('type')))
synthesized_df = synthesized_df[cols]
return synthesized_df

def get_result_from_query(self) -> Tuple[pd.DataFrame, Sequence[Document]]:
"""
Get the result from the query using the QA chain and return the results in a dataframe that is then sent to the frontend.
Expand All @@ -168,6 +195,7 @@ def get_result_from_query(self) -> Tuple[pd.DataFrame, Sequence[Document]]:

source_documents = self.fetch_results()
dict_results, ids_order = self.process_documents(source_documents)
output_df = self.create_output_dataframe(dict_results, type_of_query, ids_order)

return output_df, ids_order
# output_df = self.create_output_dataframe(dict_results, type_of_query, ids_order)
output_df = self.meta_synthesize(ids_order)
output_json = output_df.to_json(orient="records")
return output_json
16 changes: 10 additions & 6 deletions backend/modules/vector_store_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ class DataLoader:
"""
Description: Used to chunk data
"""
def __init__(self, metadata_df: pd.DataFrame, page_content_column: str, chunk_size:int = 1000, chunk_overlap:int = 150):
def __init__(self, metadata_df: pd.DataFrame, page_content_column: str, chunk_size:int = 1000, chunk_overlap:int = 150, type:str='dataset'):
self.metadata_df = metadata_df
self.page_content_column = page_content_column
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap if self.chunk_size > chunk_overlap else self.chunk_size
self.type = type

def load_and_process_data(self) -> list:
"""
Expand All @@ -28,6 +29,8 @@ def load_and_process_data(self) -> list:
self.metadata_df, page_content_column=self.page_content_column
)
documents = loader.load()
for d in documents:
d.metadata['type']=self.type

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap
Expand Down Expand Up @@ -122,9 +125,10 @@ def get_collection_name(self) -> str:
"""
Description: Fixes some collection names. (workaround from OpenML API)
"""
return {"dataset": "datasets", "flow": "flows"}.get(
self.config["type_of_data"], "default"
)
return 'default'
# return {"dataset": "datasets", "flow": "flows"}.get(
# self.config["type_of_data"], "default"
# )

def load_vector_store(
self, embeddings: HuggingFaceEmbeddings, collection_name: str
Expand Down Expand Up @@ -155,7 +159,7 @@ def add_documents_to_db(db, unique_docs, unique_ids, bs = 512):
for i in range(0, len(unique_docs), bs):
db.add_documents(unique_docs[i : i + bs], ids=unique_ids[i : i + bs])

def create_vector_store(self, metadata_df: pd.DataFrame) -> Chroma:
def create_vector_store(self, metadata_df: pd.DataFrame, type) -> Chroma:
"""
Description: Load embeddings, get chunked data, subset if needed , find unique, and then finally add to ChromaDB
"""
Expand All @@ -170,7 +174,7 @@ def create_vector_store(self, metadata_df: pd.DataFrame) -> Chroma:
)

data_loader = DataLoader(
metadata_df, page_content_column="Combined_information", chunk_size = self.config["chunk_size"]
metadata_df, page_content_column="Combined_information", chunk_size = self.config["chunk_size"], type=type
)
documents = data_loader.load_and_process_data()

Expand Down
16 changes: 8 additions & 8 deletions backend/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
client=client,
)
qa_dataset, _ = qa_dataset_handler.setup_vector_db_and_qa()
# Run the test query
result_data_frame = QueryProcessor(
query=query_test_dict[type_of_data],
qa=qa_dataset,
type_of_query=type_of_data,
config=config,
).get_result_from_query()
print(result_data_frame)
# # Run the test query
# result_data_frame = QueryProcessor(
# query=query_test_dict[type_of_data],
# qa=qa_dataset,
# type_of_query=type_of_data,
# config=config,
# ).get_result_from_query()
# print(result_data_frame)
8 changes: 1 addition & 7 deletions frontend/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@
from structured_query.chroma_store_utilis import *
collec = load_chroma_metadata()

# Metadata paths
data_metadata_path = Path(config["data_dir"]) / "all_dataset_description.csv"
flow_metadata_path = Path(config["data_dir"]) / "all_flow_description.csv"

# Load metadata
data_metadata = pd.read_csv(data_metadata_path)
flow_metadata = pd.read_csv(flow_metadata_path)

# Main Streamlit App
st.title("OpenML AI Search")
Expand Down Expand Up @@ -66,7 +60,7 @@
response_parser.fetch_llm_response(query)
# get updated columns based on llm response

results = response_parser.parse_and_update_response(data_metadata)
results = response_parser.parse_and_update_response()
# display results in a table
display_results(results)

Expand Down
63 changes: 24 additions & 39 deletions frontend/ui_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def fetch_structured_query(self, query_type: str, query: str):

return self.structured_query_response


def database_filter(self, filter_condition, collec):
"""
Apply database filter on the rag_response
Expand All @@ -194,17 +195,17 @@ def fetch_rag_response(self, query_type, query):
f"{rag_response_path['local']}{query_type.lower()}/{query}",
json={"query": query, "type": query_type.lower()},
).json()
doc_set = set()
ordered_set = []
for docid in self.rag_response['initial_response']:
if docid not in doc_set:
ordered_set.append(docid)
doc_set.add(docid)
self.rag_response['initial_response'] = ordered_set
# doc_set = set()
# ordered_set = []
# for docid in self.rag_response['initial_response']:
# if docid not in doc_set:
# ordered_set.append(docid)
# doc_set.add(docid)
# self.rag_response['initial_response'] = ordered_set

return self.rag_response

def parse_and_update_response(self, metadata: pd.DataFrame):
def parse_and_update_response(self):
"""
Description: Parse the response from the RAG and LLM services and update the metadata based on the response.
Decide which order to apply them
Expand All @@ -216,46 +217,30 @@ def parse_and_update_response(self, metadata: pd.DataFrame):
if self.apply_llm_before_rag is None or self.llm_response is None:
print('No LLM filter.')
print(self.rag_response, flush=True)
filtered_metadata = metadata[
metadata["did"].isin(self.rag_response["initial_response"])
]
filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"],
categories=self.rag_response["initial_response"],
ordered=True)
filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True)

print(filtered_metadata)
# if no llm response is required, return the initial response
return filtered_metadata
return pd.read_json(self.rag_response['initial_response'], orient='records')

elif self.rag_response is not None and self.llm_response is not None:
if not self.apply_llm_before_rag:
print('RAG before LLM filter.')
filtered_metadata = metadata[
metadata["did"].isin(self.rag_response["initial_response"])
]
filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"],
categories=self.rag_response["initial_response"],
ordered=True)
filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True)
filtered_metadata = pd.read_json(self.rag_response['initial_response'], orient='records')
llm_parser = LLMResponseParser(self.llm_response)

if self.query_type.lower() == "dataset":
llm_parser.get_attributes_from_response()
return llm_parser.update_subset_cols(filtered_metadata)
elif self.apply_llm_before_rag:
print('LLM filter before RAG')
llm_parser = LLMResponseParser(self.llm_response)
llm_parser.get_attributes_from_response()
filtered_metadata = llm_parser.update_subset_cols(metadata)
filtered_metadata = filtered_metadata[
metadata["did"].isin(self.rag_response["initial_response"])
]
filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"],
categories=self.rag_response["initial_response"],
ordered=True)
filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True)
return filtered_metadata
# elif self.apply_llm_before_rag:
# print('LLM filter before RAG')
# llm_parser = LLMResponseParser(self.llm_response)
# llm_parser.get_attributes_from_response()
# filtered_metadata = llm_parser.update_subset_cols(metadata)
# filtered_metadata = filtered_metadata[
# metadata["did"].isin(self.rag_response["initial_response"])
# ]
# filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"],
# categories=self.rag_response["initial_response"],
# ordered=True)
# filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True)
# return filtered_metadata

elif self.rag_response is not None and self.structured_query_response is not None:
col_name = ["status", "NumberOfClasses", "NumberOfFeatures", "NumberOfInstances"]
Expand Down