diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/.env b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/.env index 7aba396..0222e88 100644 --- a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/.env +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/.env @@ -23,3 +23,8 @@ NOTSET = 0 # allowed values deploy_rag_workflow_with_retry_query_engine, # deploy_rag_workflow_with_retry_source_query_engine, deploy_rag_workflow_with_retry_guideline_query_engine ENABLED_WORKFLOW='deploy_rag_workflow_with_retry_source_query_engine' + +DEFAULT_KAFKA_URL = "localhost:9092" +WORKFLOW_HOST='127.0.0.1' +WORKFLOW_PORT=8004 +WORKFLOW_SERVICE_NAME='rag_workflow_with_retry_guideline_query_engine' diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_core.py b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_core.py new file mode 100644 index 0000000..e806f8a --- /dev/null +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_core.py @@ -0,0 +1,22 @@ +from llama_deploy import ( + deploy_core, + ControlPlaneConfig +) +from llama_deploy.message_queues.apache_kafka import KafkaMessageQueueConfig +from dotenv import load_dotenv, find_dotenv +import os + + +async def main(): + _ = load_dotenv(find_dotenv()) + + await deploy_core( + control_plane_config=ControlPlaneConfig(), + message_queue_config=KafkaMessageQueueConfig(url=os.environ.get('DEFAULT_KAFKA_URL')), + ) + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_workflow.py b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_workflow.py new file mode 100644 index 0000000..8675591 --- /dev/null +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/deploy_workflow.py @@ -0,0 +1,39 @@ +from llama_deploy import ( + deploy_workflow, + WorkflowServiceConfig, + ControlPlaneConfig +) +from retry_guideline_query_engine_workflow import build_rag_workflow_with_retry_guideline_query_engine +from dotenv import load_dotenv, find_dotenv + + +_ = load_dotenv(find_dotenv()) + + +async def deploy_rag_workflow_with_retry_guideline_query_engine(): + rag_workflow = build_rag_workflow_with_retry_guideline_query_engine() + try: + await deploy_workflow( + workflow=rag_workflow, + workflow_config=WorkflowServiceConfig( + host="127.0.0.1", + port=8004, + # service name matches the name of the workflow used in Agentic Workflow + service_name="rag_workflow_with_retry_guideline_query_engine", + description="RAG workflow", + ), + # Config controlled by env vars + control_plane_config=ControlPlaneConfig() + ) + except Exception as e: + print(e) + +if __name__ == "__main__": + import asyncio + import nest_asyncio + + nest_asyncio.apply() + try: + asyncio.run(deploy_rag_workflow_with_retry_guideline_query_engine()) + except Exception as e: + print(e) diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/events.py b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/events.py new file mode 100644 index 0000000..26c323a --- /dev/null +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/events.py @@ -0,0 +1,9 @@ +from llama_index.core.workflow import Event +from llama_index. core. base. base_query_engine import BaseQueryEngine + + +class QueryEngineEvent(Event): + """Result of running retrieval""" + + base_query_engine: BaseQueryEngine + diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/readme.md b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/readme.md index cd56a29..ee0e535 100644 --- a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/readme.md +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/readme.md @@ -1,17 +1,14 @@ -## llama-agents +## llama-workflows with llama-deploy -### install qdrant -- `docker pull qdrant/qdrant` -- `docker run -p 6333:6333 -p 6334:6334 \ - -v $(pwd)/qdrant_storage:/qdrant/storage:z \ - qdrant/qdrant` - -### install ollama -- navigate to [https://ollama.com/download](https://ollama.com/download) +### install Required Software (Ollama and Qdrant) +- follow the documentation from the root folder ### how to run llama-agents - open `.env` - change the `DB_URL`, `DB_API_KEY` and `COLLECTION_NAME` according to you -- point the right right LLMs (if not local) +- point the right LLMs (if not local) - `pip install -r requirements.txt` +- `python deploy_code.py` +- `python deploy_workflow.py` - `python main.py` + diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/requirements.txt b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/requirements.txt index b8d659b..f5b426b 100644 --- a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/requirements.txt +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/requirements.txt @@ -4,4 +4,6 @@ llama-index-vector-stores-qdrant==0.3.0 llama-index-llms-ollama==0.3.1 llama-index-embeddings-fastembed==0.2.0 llama-index-embeddings-ollama==0.3.0 -llama-index-postprocessor-rankgpt-rerank == 0.2.0 \ No newline at end of file +llama-index-postprocessor-rankgpt-rerank == 0.2.0 +aiokafka==0.11.0 +kafka-python-ng==2.2.2 diff --git a/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/retry_guideline_query_engine_workflow.py b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/retry_guideline_query_engine_workflow.py new file mode 100644 index 0000000..237e134 --- /dev/null +++ b/bootstraprag/templates/llamaindex/llama_deploy_with_kafka/retry_guideline_query_engine_workflow.py @@ -0,0 +1,102 @@ +import os +import logging +import qdrant_client +from llama_index.core.workflow import ( + Workflow, + Context, + StartEvent, + StopEvent, + step +) +from llama_index.core.base.base_query_engine import BaseQueryEngine +from llama_index.core.query_engine import RetryGuidelineQueryEngine +from llama_index.core import (VectorStoreIndex, Settings, StorageContext, SimpleDirectoryReader) +from llama_index.core.evaluation import GuidelineEvaluator +from llama_index.vector_stores.qdrant import QdrantVectorStore +from llama_index.llms.ollama import Ollama +from llama_index.embeddings.ollama import OllamaEmbedding +from llama_index.core.evaluation.guideline import DEFAULT_GUIDELINES +from dotenv import load_dotenv, find_dotenv +from events import QueryEngineEvent + +_ = load_dotenv(find_dotenv()) + +logging.basicConfig(level=int(os.environ['INFO'])) +logger = logging.getLogger(__name__) + + +class RAGWorkflowWithRetryGuidelineQueryEngine(Workflow): + def __init__(self, index: VectorStoreIndex, *args, **kwargs): + super().__init__(*args, **kwargs) + self.index: VectorStoreIndex = index + + @step + async def create_retry_query_engine(self, ctx: Context, ev: StartEvent) -> QueryEngineEvent | None: + "Entry point for RAG, triggered by a StartEvent with `query`." + logger.info(f"creating query engine for query: {ev.get('query')}") + query = ev.get("query") + no_of_retries = ev.get("no_of_retries", default=3) + + if not query: + raise ValueError("Query is required!") + + # store the settings in the global context + await ctx.set("query", query) + await ctx.set("no_of_retries", no_of_retries) + + base_query_engine = self.index.as_query_engine(llm=Settings.llm, similarity_top_k=2, sparse_top_k=12, + vector_store_query_mode="hybrid") + return QueryEngineEvent(base_query_engine=base_query_engine) + + @step + async def query_with_retry_source_query_engine(self, ctx: Context, ev: QueryEngineEvent) -> StopEvent: + """Return a response using reranked nodes.""" + query = await ctx.get("query") + no_of_retries = await ctx.get("no_of_retries") + base_query_engine: BaseQueryEngine = ev.base_query_engine + + # Guideline eval + guideline_eval = GuidelineEvaluator( + guidelines=DEFAULT_GUIDELINES + "\nThe response should not be overly long.\n" + "The response should try to summarize where possible.\n" + ) # just for example + retry_guideline_query_engine = RetryGuidelineQueryEngine(base_query_engine, guideline_eval, + resynthesize_query=True, max_retries=no_of_retries) + retry_guideline_response = retry_guideline_query_engine.query(query) + logger.info(f"response for query is: {retry_guideline_response}") + return StopEvent(result=str(retry_guideline_response)) + + +def build_rag_workflow_with_retry_guideline_query_engine() -> RAGWorkflowWithRetryGuidelineQueryEngine: + index_loaded = False + # host points to qdrant in docker-compose.yml + client = qdrant_client.QdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY']) + aclient = qdrant_client.AsyncQdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY']) + vector_store = QdrantVectorStore(collection_name=os.environ['COLLECTION_NAME'], client=client, aclient=aclient, + enable_hybrid=True, batch_size=50) + + Settings.llm = Ollama(model=os.environ['OLLAMA_LLM_MODEL'], base_url=os.environ['OLLAMA_BASE_URL'], + request_timeout=600) + Settings.embed_model = OllamaEmbedding(model_name=os.environ['OLLAMA_EMBED_MODEL'], + base_url=os.environ['OLLAMA_BASE_URL']) + + # index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model) + index: VectorStoreIndex = None + + if client.collection_exists(collection_name=os.environ['COLLECTION_NAME']): + try: + index = VectorStoreIndex.from_vector_store(vector_store=vector_store) + index_loaded = True + except Exception as e: + index_loaded = False + + if not index_loaded: + # load data + _docs = (SimpleDirectoryReader(input_dir='data', required_exts=['.pdf']).load_data(show_progress=True)) + + # build and persist index + storage_context = StorageContext.from_defaults(vector_store=vector_store) + logger.info("indexing the docs in VectorStoreIndex") + index = VectorStoreIndex.from_documents(documents=_docs, storage_context=storage_context, show_progress=True) + + return RAGWorkflowWithRetryGuidelineQueryEngine(index=index, timeout=120.0) diff --git a/setup.py b/setup.py index 587adbf..d2160ee 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='bootstrap-rag', - version='0.0.2', + version='0.0.3', packages=find_packages(), include_package_data=True, install_requires=[