diff --git a/backend/backend.py b/backend/backend.py index 48394e4..c230497 100644 --- a/backend/backend.py +++ b/backend/backend.py @@ -5,6 +5,7 @@ from modules.rag_llm import * from modules.utils import * from tenacity import retry, retry_if_exception_type, stop_after_attempt +from pathlib import Path app = FastAPI() # Config and DB @@ -18,7 +19,12 @@ # load the persistent database using ChromaDB print('Loading DB') client = chromadb.PersistentClient(path=config["persist_dir"]) + # Loading the metadata for all types +data_metadata_path = Path(config["data_dir"]) / "all_dataset_description.csv" +flow_metadata_path = Path(config["data_dir"]) / "all_flow_description.csv" +data_metadata = pd.read_csv(data_metadata_path) +flow_metadata = pd.read_csv(flow_metadata_path) # Setup llm chain, initialize the retriever and llm, and setup Retrieval QA print('Setting LLM chain') @@ -64,15 +70,17 @@ async def read_dataset(query: str): try: # Fetch the result data frame based on the query - _, ids_order = QueryProcessor( + result = QueryProcessor( query=query, qa=qa_dataset, type_of_query='dataset', config=config, + dataset_meta=data_metadata, + flow_meta=flow_metadata, ).get_result_from_query() response = JSONResponse( - content={"initial_response": ids_order}, status_code=200 + content={"initial_response": result}, status_code=200 ) return response @@ -81,21 +89,21 @@ async def read_dataset(query: str): return JSONResponse(content={"error": str(e)}, status_code=500) -@app.get("/flow/{query}", response_class=JSONResponse) -@retry(retry=retry_if_exception_type(ConnectTimeout), stop=stop_after_attempt(2)) -async def read_flow(query: str): - try: - _, ids_order = QueryProcessor( - query=query, - qa=qa_flow, - type_of_query='flow', - config=config, - ).get_result_from_query() - - response = JSONResponse( - content={"initial_response": ids_order}, status_code=200 - ) - - return response - except Exception as e: - return JSONResponse(content={"error": str(e)}, status_code=500) +# @app.get("/flow/{query}", response_class=JSONResponse) +# @retry(retry=retry_if_exception_type(ConnectTimeout), stop=stop_after_attempt(2)) +# async def read_flow(query: str): +# try: +# _, ids_order = QueryProcessor( +# query=query, +# qa=qa_flow, +# type_of_query='flow', +# config=config, +# ).get_result_from_query() +# +# response = JSONResponse( +# content={"initial_response": ids_order}, status_code=200 +# ) +# +# return response +# except Exception as e: +# return JSONResponse(content={"error": str(e)}, status_code=500) diff --git a/backend/modules/rag_llm.py b/backend/modules/rag_llm.py index 23606cc..9e74cfa 100644 --- a/backend/modules/rag_llm.py +++ b/backend/modules/rag_llm.py @@ -70,7 +70,7 @@ def setup_vector_db_and_qa(self): ) vector_store_manager = VectorStoreManager(self.client, self.config) - vectordb = vector_store_manager.create_vector_store(metadata_df) + vectordb = vector_store_manager.create_vector_store(metadata_df,self.data_type) qa = LLMChainInitializer.initialize_llm_chain(vectordb, self.config) return qa, all_metadata diff --git a/backend/modules/results_gen.py b/backend/modules/results_gen.py index 9618be8..b1e3607 100644 --- a/backend/modules/results_gen.py +++ b/backend/modules/results_gen.py @@ -15,6 +15,9 @@ from langchain_core.documents import BaseDocumentTransformer, Document from tqdm import tqdm +from structured_query.structuring_query import filter_attribute_info + + # --- PROCESSING RESULTS --- @@ -34,11 +37,13 @@ def long_context_reorder(results): class QueryProcessor: - def __init__(self, query: str, qa: RetrievalQA, type_of_query: str, config: dict): + def __init__(self, query: str, qa: RetrievalQA, type_of_query: str, dataset_meta, flow_meta, config: dict): self.query = query self.qa = qa self.type_of_query = type_of_query self.config = config + self.dataset_meta = dataset_meta + self.flow_meta = flow_meta def fetch_results(self): """ @@ -81,18 +86,15 @@ def fetch_results(self): @staticmethod def process_documents( - source_documents: Sequence[Document], + source_documents: Sequence[Document], ) -> Tuple[OrderedDict, list]: """ Process the source documents and create a dictionary with the key_name as the key and the name and page content as the values. """ dict_results = OrderedDict() for result in source_documents: - dict_results[result.metadata["did"]] = { - "name": result.metadata["name"], - "page_content": result.page_content, - } - ids = [result.metadata["did"] for result in source_documents] + dict_results[result.metadata["did"]] = result + ids = [(result.metadata["did"], result.metadata['type']) for result in source_documents] return dict_results, ids @staticmethod @@ -103,7 +105,7 @@ def make_clickable(val: str) -> str: return '{}'.format(val, val) def create_output_dataframe( - self, dict_results: dict, type_of_data: str, ids_order: list + self, dict_results: dict, type_of_data: str, ids_order: list ) -> pd.DataFrame: """ Create an output dataframe with the results. The URLs are API calls to the OpenML API for the specific type of data. @@ -151,6 +153,31 @@ def check_query(query: str) -> str: query = query[:200] return query + def meta_synthesize(self, ids_order): + # Initialize an empty list to store the synthesized rows + synthesized_rows = [] + + # Iterate over ids_order and append the corresponding row from dataset_meta or flow_meta + for did, dtype in ids_order: + if dtype == 'dataset': + row = self.dataset_meta[self.dataset_meta['did'] == did].copy() + if not row.empty: + row.loc[:, 'type'] = 'dataset' + synthesized_rows.append(row.iloc[0]) + elif dtype == 'flow': + row = self.flow_meta[self.flow_meta['did'] == did].copy() + if not row.empty: + row.loc[:, 'type'] = 'flow' + synthesized_rows.append(row.iloc[0]) + + # Convert the list of rows to a DataFrame + synthesized_df = pd.DataFrame(synthesized_rows).reset_index(drop=True) + # Reorder the columns to place 'type' as the second column + cols = synthesized_df.columns.tolist() + cols.insert(1, cols.pop(cols.index('type'))) + synthesized_df = synthesized_df[cols] + return synthesized_df + def get_result_from_query(self) -> Tuple[pd.DataFrame, Sequence[Document]]: """ Get the result from the query using the QA chain and return the results in a dataframe that is then sent to the frontend. @@ -168,6 +195,7 @@ def get_result_from_query(self) -> Tuple[pd.DataFrame, Sequence[Document]]: source_documents = self.fetch_results() dict_results, ids_order = self.process_documents(source_documents) - output_df = self.create_output_dataframe(dict_results, type_of_query, ids_order) - - return output_df, ids_order + # output_df = self.create_output_dataframe(dict_results, type_of_query, ids_order) + output_df = self.meta_synthesize(ids_order) + output_json = output_df.to_json(orient="records") + return output_json diff --git a/backend/modules/vector_store_utils.py b/backend/modules/vector_store_utils.py index ed25cc4..ad5af10 100644 --- a/backend/modules/vector_store_utils.py +++ b/backend/modules/vector_store_utils.py @@ -14,11 +14,12 @@ class DataLoader: """ Description: Used to chunk data """ - def __init__(self, metadata_df: pd.DataFrame, page_content_column: str, chunk_size:int = 1000, chunk_overlap:int = 150): + def __init__(self, metadata_df: pd.DataFrame, page_content_column: str, chunk_size:int = 1000, chunk_overlap:int = 150, type:str='dataset'): self.metadata_df = metadata_df self.page_content_column = page_content_column self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap if self.chunk_size > chunk_overlap else self.chunk_size + self.type = type def load_and_process_data(self) -> list: """ @@ -28,6 +29,8 @@ def load_and_process_data(self) -> list: self.metadata_df, page_content_column=self.page_content_column ) documents = loader.load() + for d in documents: + d.metadata['type']=self.type text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap @@ -122,9 +125,10 @@ def get_collection_name(self) -> str: """ Description: Fixes some collection names. (workaround from OpenML API) """ - return {"dataset": "datasets", "flow": "flows"}.get( - self.config["type_of_data"], "default" - ) + return 'default' + # return {"dataset": "datasets", "flow": "flows"}.get( + # self.config["type_of_data"], "default" + # ) def load_vector_store( self, embeddings: HuggingFaceEmbeddings, collection_name: str @@ -155,7 +159,7 @@ def add_documents_to_db(db, unique_docs, unique_ids, bs = 512): for i in range(0, len(unique_docs), bs): db.add_documents(unique_docs[i : i + bs], ids=unique_ids[i : i + bs]) - def create_vector_store(self, metadata_df: pd.DataFrame) -> Chroma: + def create_vector_store(self, metadata_df: pd.DataFrame, type) -> Chroma: """ Description: Load embeddings, get chunked data, subset if needed , find unique, and then finally add to ChromaDB """ @@ -170,7 +174,7 @@ def create_vector_store(self, metadata_df: pd.DataFrame) -> Chroma: ) data_loader = DataLoader( - metadata_df, page_content_column="Combined_information", chunk_size = self.config["chunk_size"] + metadata_df, page_content_column="Combined_information", chunk_size = self.config["chunk_size"], type=type ) documents = data_loader.load_and_process_data() diff --git a/backend/training.py b/backend/training.py index c998fb8..3ee30a4 100644 --- a/backend/training.py +++ b/backend/training.py @@ -37,11 +37,11 @@ client=client, ) qa_dataset, _ = qa_dataset_handler.setup_vector_db_and_qa() - # Run the test query - result_data_frame = QueryProcessor( - query=query_test_dict[type_of_data], - qa=qa_dataset, - type_of_query=type_of_data, - config=config, - ).get_result_from_query() - print(result_data_frame) + # # Run the test query + # result_data_frame = QueryProcessor( + # query=query_test_dict[type_of_data], + # qa=qa_dataset, + # type_of_query=type_of_data, + # config=config, + # ).get_result_from_query() + # print(result_data_frame) diff --git a/frontend/ui.py b/frontend/ui.py index c524d21..3a1ccd2 100644 --- a/frontend/ui.py +++ b/frontend/ui.py @@ -16,13 +16,7 @@ from structured_query.chroma_store_utilis import * collec = load_chroma_metadata() -# Metadata paths -data_metadata_path = Path(config["data_dir"]) / "all_dataset_description.csv" -flow_metadata_path = Path(config["data_dir"]) / "all_flow_description.csv" -# Load metadata -data_metadata = pd.read_csv(data_metadata_path) -flow_metadata = pd.read_csv(flow_metadata_path) # Main Streamlit App st.title("OpenML AI Search") @@ -66,7 +60,7 @@ response_parser.fetch_llm_response(query) # get updated columns based on llm response - results = response_parser.parse_and_update_response(data_metadata) + results = response_parser.parse_and_update_response() # display results in a table display_results(results) diff --git a/frontend/ui_utils.py b/frontend/ui_utils.py index 0d4161d..c59451d 100644 --- a/frontend/ui_utils.py +++ b/frontend/ui_utils.py @@ -168,6 +168,7 @@ def fetch_structured_query(self, query_type: str, query: str): return self.structured_query_response + def database_filter(self, filter_condition, collec): """ Apply database filter on the rag_response @@ -194,17 +195,17 @@ def fetch_rag_response(self, query_type, query): f"{rag_response_path['local']}{query_type.lower()}/{query}", json={"query": query, "type": query_type.lower()}, ).json() - doc_set = set() - ordered_set = [] - for docid in self.rag_response['initial_response']: - if docid not in doc_set: - ordered_set.append(docid) - doc_set.add(docid) - self.rag_response['initial_response'] = ordered_set + # doc_set = set() + # ordered_set = [] + # for docid in self.rag_response['initial_response']: + # if docid not in doc_set: + # ordered_set.append(docid) + # doc_set.add(docid) + # self.rag_response['initial_response'] = ordered_set return self.rag_response - def parse_and_update_response(self, metadata: pd.DataFrame): + def parse_and_update_response(self): """ Description: Parse the response from the RAG and LLM services and update the metadata based on the response. Decide which order to apply them @@ -216,46 +217,30 @@ def parse_and_update_response(self, metadata: pd.DataFrame): if self.apply_llm_before_rag is None or self.llm_response is None: print('No LLM filter.') print(self.rag_response, flush=True) - filtered_metadata = metadata[ - metadata["did"].isin(self.rag_response["initial_response"]) - ] - filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"], - categories=self.rag_response["initial_response"], - ordered=True) - filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True) - - print(filtered_metadata) - # if no llm response is required, return the initial response - return filtered_metadata + return pd.read_json(self.rag_response['initial_response'], orient='records') elif self.rag_response is not None and self.llm_response is not None: if not self.apply_llm_before_rag: print('RAG before LLM filter.') - filtered_metadata = metadata[ - metadata["did"].isin(self.rag_response["initial_response"]) - ] - filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"], - categories=self.rag_response["initial_response"], - ordered=True) - filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True) + filtered_metadata = pd.read_json(self.rag_response['initial_response'], orient='records') llm_parser = LLMResponseParser(self.llm_response) if self.query_type.lower() == "dataset": llm_parser.get_attributes_from_response() return llm_parser.update_subset_cols(filtered_metadata) - elif self.apply_llm_before_rag: - print('LLM filter before RAG') - llm_parser = LLMResponseParser(self.llm_response) - llm_parser.get_attributes_from_response() - filtered_metadata = llm_parser.update_subset_cols(metadata) - filtered_metadata = filtered_metadata[ - metadata["did"].isin(self.rag_response["initial_response"]) - ] - filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"], - categories=self.rag_response["initial_response"], - ordered=True) - filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True) - return filtered_metadata + # elif self.apply_llm_before_rag: + # print('LLM filter before RAG') + # llm_parser = LLMResponseParser(self.llm_response) + # llm_parser.get_attributes_from_response() + # filtered_metadata = llm_parser.update_subset_cols(metadata) + # filtered_metadata = filtered_metadata[ + # metadata["did"].isin(self.rag_response["initial_response"]) + # ] + # filtered_metadata["did"] = pd.Categorical(filtered_metadata["did"], + # categories=self.rag_response["initial_response"], + # ordered=True) + # filtered_metadata = filtered_metadata.sort_values("did").reset_index(drop=True) + # return filtered_metadata elif self.rag_response is not None and self.structured_query_response is not None: col_name = ["status", "NumberOfClasses", "NumberOfFeatures", "NumberOfInstances"]