Skip to content

Commit

Permalink
Merge pull request #8 from ittia-research/dev
Browse files Browse the repository at this point in the history
add feature index build concurrency
  • Loading branch information
etwk authored Aug 14, 2024
2 parents a9bd755 + 6d0389c commit bb1b4e9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EMBEDDING_MODEL_DEPLOY=api
EMBEDDING_MODEL_NAME=jinaai/jina-embeddings-v2-base-en
EMBEDDING_MODEL_NAME=jina/jina-embeddings-v2-base-en
LLM_MODEL_NAME=google/gemma-2-27b-it
OLLAMA_BASE_URL=http://ollama:11434
OPENAI_API_KEY=sk-proj-aaaaaaaaaaaaaaaaa
Expand All @@ -9,4 +9,5 @@ RERANK_MODEL_DEPLOY=local
RERANK_MODEL_NAME=BAAI/bge-reranker-v2-m3
RERANK_BASE_URL=http://xinference:9997/v1
SEARCH_BASE_URL=https://s.jina.ai
THREAD_BUILD_INDEX=12
RAG_CHUNK_SIZES=[4096, 1024, 256]
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Retrieval
DSPy:
- [ ] make dspy.settings apply to sessions only in order to support multiple retrieve index

### Retrival
- [ ] Better retrival solution: high performance, concurrency, multiple index, index editable.

### Toolchain
- [ ] Evaluate MLOps pipeline
- https://kitops.ml
Expand Down
52 changes: 43 additions & 9 deletions src/retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os, logging
from typing import Optional
import concurrent.futures

from llama_index.core import (
Document,
Expand Down Expand Up @@ -49,7 +50,30 @@ def __init__(
self.similarity_top_k = similarity_top_k
if docs:
self.build_index(docs)


def create_index(self, nodes):
storage_context = StorageContext.from_defaults()
storage_context.docstore.add_documents(nodes)
leaf_nodes = get_leaf_nodes(nodes)
return VectorStoreIndex(leaf_nodes, storage_context=storage_context)

def merge_index(self, indexs):
"""
Args:
- indexs: list of indexs
"""
nodes = []
for index in indexs:
vector_store_dict = index.storage_context.vector_store.to_dict()
embedding_dict = vector_store_dict['embedding_dict']
for doc_id, node in index.storage_context.docstore.docs.items():
# necessary to avoid re-calc of embeddings
node.embedding = embedding_dict[doc_id]
nodes.append(node)

merged_index = VectorStoreIndex(nodes=nodes)
return merged_index

def build_automerging_index(
self,
documents,
Expand All @@ -62,20 +86,30 @@ def build_automerging_index(
storage_context = StorageContext.from_defaults()
storage_context.docstore.add_documents(nodes)

automerging_index = VectorStoreIndex(
leaf_nodes, storage_context=storage_context
)
return automerging_index
leaf_indexs = []

# TODO: better concurrency, possibly async
with concurrent.futures.ThreadPoolExecutor(max_workers=settings.THREAD_BUILD_INDEX) as executor:
future_to_index = {executor.submit(self.create_index, [_node]): _node for _node in leaf_nodes}

for future in concurrent.futures.as_completed(future_to_index):
index = future.result()
leaf_indexs.append(index)

automerging_index = self.merge_index(leaf_indexs)

return automerging_index, storage_context

def get_automerging_query_engine(
self,
automerging_index,
storage_context,
similarity_top_k=6,
rerank_top_n=3,
):
base_retriever = automerging_index.as_retriever(similarity_top_k=similarity_top_k)
retriever = AutoMergingRetriever(
base_retriever, automerging_index.storage_context, verbose=True
base_retriever, storage_context, verbose=True
)

if settings.RERANK_MODEL_DEPLOY == "local":
Expand All @@ -95,15 +129,15 @@ def build_index(self, docs):
"""Initiate index or build a new one."""

if docs:
index = self.build_automerging_index(
self.index, self.storage_context = self.build_automerging_index(
docs,
chunk_sizes=settings.RAG_CHUNK_SIZES,
) # TODO: try to retrieve directly
self.index = index

def retrieve(self, query):
query_engine = self.get_automerging_query_engine(
self.index,
automerging_index=self.index,
storage_context=self.storage_context,
similarity_top_k=self.similarity_top_k * 3,
rerank_top_n=self.similarity_top_k
)
Expand Down
3 changes: 3 additions & 0 deletions src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ def __init__(self):
self.RAG_CHUNK_SIZES = ast.literal_eval(_chunk_sizes)
except:
pass

# threads
self.THREAD_BUILD_INDEX = int(os.environ.get("THREAD_BUILD_INDEX", 12))

settings = Settings()

0 comments on commit bb1b4e9

Please sign in to comment.