Skip to content

Commit 675805f

Browse files
pavanjavapavanmantha
and
pavanmantha
authored
Llama deploy and fastapi support (#24)
* -product pitch presentation * -llama-deploy implementation, -implemented fastapi for all templates, -partial implementation of docker for templates * implemented the llama-deploy and workflows with simplemq and rabbitmq * - modified the docs * - implemented kafaka queue mechanism * - modified the docs and version --------- Co-authored-by: pavanmantha <[email protected]>
1 parent 18102d6 commit 675805f

File tree

8 files changed

+188
-12
lines changed

8 files changed

+188
-12
lines changed

bootstraprag/templates/llamaindex/llama_deploy_with_kafka/.env

+5
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,8 @@ NOTSET = 0
2323
# allowed values deploy_rag_workflow_with_retry_query_engine,
2424
# deploy_rag_workflow_with_retry_source_query_engine, deploy_rag_workflow_with_retry_guideline_query_engine
2525
ENABLED_WORKFLOW='deploy_rag_workflow_with_retry_source_query_engine'
26+
27+
DEFAULT_KAFKA_URL = "localhost:9092"
28+
WORKFLOW_HOST='127.0.0.1'
29+
WORKFLOW_PORT=8004
30+
WORKFLOW_SERVICE_NAME='rag_workflow_with_retry_guideline_query_engine'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from llama_deploy import (
2+
deploy_core,
3+
ControlPlaneConfig
4+
)
5+
from llama_deploy.message_queues.apache_kafka import KafkaMessageQueueConfig
6+
from dotenv import load_dotenv, find_dotenv
7+
import os
8+
9+
10+
async def main():
11+
_ = load_dotenv(find_dotenv())
12+
13+
await deploy_core(
14+
control_plane_config=ControlPlaneConfig(),
15+
message_queue_config=KafkaMessageQueueConfig(url=os.environ.get('DEFAULT_KAFKA_URL')),
16+
)
17+
18+
19+
if __name__ == "__main__":
20+
import asyncio
21+
22+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from llama_deploy import (
2+
deploy_workflow,
3+
WorkflowServiceConfig,
4+
ControlPlaneConfig
5+
)
6+
from retry_guideline_query_engine_workflow import build_rag_workflow_with_retry_guideline_query_engine
7+
from dotenv import load_dotenv, find_dotenv
8+
9+
10+
_ = load_dotenv(find_dotenv())
11+
12+
13+
async def deploy_rag_workflow_with_retry_guideline_query_engine():
14+
rag_workflow = build_rag_workflow_with_retry_guideline_query_engine()
15+
try:
16+
await deploy_workflow(
17+
workflow=rag_workflow,
18+
workflow_config=WorkflowServiceConfig(
19+
host="127.0.0.1",
20+
port=8004,
21+
# service name matches the name of the workflow used in Agentic Workflow
22+
service_name="rag_workflow_with_retry_guideline_query_engine",
23+
description="RAG workflow",
24+
),
25+
# Config controlled by env vars
26+
control_plane_config=ControlPlaneConfig()
27+
)
28+
except Exception as e:
29+
print(e)
30+
31+
if __name__ == "__main__":
32+
import asyncio
33+
import nest_asyncio
34+
35+
nest_asyncio.apply()
36+
try:
37+
asyncio.run(deploy_rag_workflow_with_retry_guideline_query_engine())
38+
except Exception as e:
39+
print(e)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from llama_index.core.workflow import Event
2+
from llama_index. core. base. base_query_engine import BaseQueryEngine
3+
4+
5+
class QueryEngineEvent(Event):
6+
"""Result of running retrieval"""
7+
8+
base_query_engine: BaseQueryEngine
9+
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
## llama-agents
1+
## llama-workflows with llama-deploy
22

3-
### install qdrant
4-
- `docker pull qdrant/qdrant`
5-
- `docker run -p 6333:6333 -p 6334:6334 \
6-
-v $(pwd)/qdrant_storage:/qdrant/storage:z \
7-
qdrant/qdrant`
8-
9-
### install ollama
10-
- navigate to [https://ollama.com/download](https://ollama.com/download)
3+
### install Required Software (Ollama and Qdrant)
4+
- follow the documentation from the root folder
115

126
### how to run llama-agents
137
- open `.env`
148
- change the `DB_URL`, `DB_API_KEY` and `COLLECTION_NAME` according to you
15-
- point the right right LLMs (if not local)
9+
- point the right LLMs (if not local)
1610
- `pip install -r requirements.txt`
11+
- `python deploy_code.py`
12+
- `python deploy_workflow.py`
1713
- `python main.py`
14+

bootstraprag/templates/llamaindex/llama_deploy_with_kafka/requirements.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ llama-index-vector-stores-qdrant==0.3.0
44
llama-index-llms-ollama==0.3.1
55
llama-index-embeddings-fastembed==0.2.0
66
llama-index-embeddings-ollama==0.3.0
7-
llama-index-postprocessor-rankgpt-rerank == 0.2.0
7+
llama-index-postprocessor-rankgpt-rerank == 0.2.0
8+
aiokafka==0.11.0
9+
kafka-python-ng==2.2.2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import os
2+
import logging
3+
import qdrant_client
4+
from llama_index.core.workflow import (
5+
Workflow,
6+
Context,
7+
StartEvent,
8+
StopEvent,
9+
step
10+
)
11+
from llama_index.core.base.base_query_engine import BaseQueryEngine
12+
from llama_index.core.query_engine import RetryGuidelineQueryEngine
13+
from llama_index.core import (VectorStoreIndex, Settings, StorageContext, SimpleDirectoryReader)
14+
from llama_index.core.evaluation import GuidelineEvaluator
15+
from llama_index.vector_stores.qdrant import QdrantVectorStore
16+
from llama_index.llms.ollama import Ollama
17+
from llama_index.embeddings.ollama import OllamaEmbedding
18+
from llama_index.core.evaluation.guideline import DEFAULT_GUIDELINES
19+
from dotenv import load_dotenv, find_dotenv
20+
from events import QueryEngineEvent
21+
22+
_ = load_dotenv(find_dotenv())
23+
24+
logging.basicConfig(level=int(os.environ['INFO']))
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class RAGWorkflowWithRetryGuidelineQueryEngine(Workflow):
29+
def __init__(self, index: VectorStoreIndex, *args, **kwargs):
30+
super().__init__(*args, **kwargs)
31+
self.index: VectorStoreIndex = index
32+
33+
@step
34+
async def create_retry_query_engine(self, ctx: Context, ev: StartEvent) -> QueryEngineEvent | None:
35+
"Entry point for RAG, triggered by a StartEvent with `query`."
36+
logger.info(f"creating query engine for query: {ev.get('query')}")
37+
query = ev.get("query")
38+
no_of_retries = ev.get("no_of_retries", default=3)
39+
40+
if not query:
41+
raise ValueError("Query is required!")
42+
43+
# store the settings in the global context
44+
await ctx.set("query", query)
45+
await ctx.set("no_of_retries", no_of_retries)
46+
47+
base_query_engine = self.index.as_query_engine(llm=Settings.llm, similarity_top_k=2, sparse_top_k=12,
48+
vector_store_query_mode="hybrid")
49+
return QueryEngineEvent(base_query_engine=base_query_engine)
50+
51+
@step
52+
async def query_with_retry_source_query_engine(self, ctx: Context, ev: QueryEngineEvent) -> StopEvent:
53+
"""Return a response using reranked nodes."""
54+
query = await ctx.get("query")
55+
no_of_retries = await ctx.get("no_of_retries")
56+
base_query_engine: BaseQueryEngine = ev.base_query_engine
57+
58+
# Guideline eval
59+
guideline_eval = GuidelineEvaluator(
60+
guidelines=DEFAULT_GUIDELINES + "\nThe response should not be overly long.\n"
61+
"The response should try to summarize where possible.\n"
62+
) # just for example
63+
retry_guideline_query_engine = RetryGuidelineQueryEngine(base_query_engine, guideline_eval,
64+
resynthesize_query=True, max_retries=no_of_retries)
65+
retry_guideline_response = retry_guideline_query_engine.query(query)
66+
logger.info(f"response for query is: {retry_guideline_response}")
67+
return StopEvent(result=str(retry_guideline_response))
68+
69+
70+
def build_rag_workflow_with_retry_guideline_query_engine() -> RAGWorkflowWithRetryGuidelineQueryEngine:
71+
index_loaded = False
72+
# host points to qdrant in docker-compose.yml
73+
client = qdrant_client.QdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY'])
74+
aclient = qdrant_client.AsyncQdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY'])
75+
vector_store = QdrantVectorStore(collection_name=os.environ['COLLECTION_NAME'], client=client, aclient=aclient,
76+
enable_hybrid=True, batch_size=50)
77+
78+
Settings.llm = Ollama(model=os.environ['OLLAMA_LLM_MODEL'], base_url=os.environ['OLLAMA_BASE_URL'],
79+
request_timeout=600)
80+
Settings.embed_model = OllamaEmbedding(model_name=os.environ['OLLAMA_EMBED_MODEL'],
81+
base_url=os.environ['OLLAMA_BASE_URL'])
82+
83+
# index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model)
84+
index: VectorStoreIndex = None
85+
86+
if client.collection_exists(collection_name=os.environ['COLLECTION_NAME']):
87+
try:
88+
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
89+
index_loaded = True
90+
except Exception as e:
91+
index_loaded = False
92+
93+
if not index_loaded:
94+
# load data
95+
_docs = (SimpleDirectoryReader(input_dir='data', required_exts=['.pdf']).load_data(show_progress=True))
96+
97+
# build and persist index
98+
storage_context = StorageContext.from_defaults(vector_store=vector_store)
99+
logger.info("indexing the docs in VectorStoreIndex")
100+
index = VectorStoreIndex.from_documents(documents=_docs, storage_context=storage_context, show_progress=True)
101+
102+
return RAGWorkflowWithRetryGuidelineQueryEngine(index=index, timeout=120.0)

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name='bootstrap-rag',
5-
version='0.0.2',
5+
version='0.0.3',
66
packages=find_packages(),
77
include_package_data=True,
88
install_requires=[

0 commit comments

Comments
 (0)