Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Llama deploy and fastapi support #24

Merged
merged 7 commits into from
Sep 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -23,3 +23,8 @@ NOTSET = 0
# allowed values deploy_rag_workflow_with_retry_query_engine,
# deploy_rag_workflow_with_retry_source_query_engine, deploy_rag_workflow_with_retry_guideline_query_engine
ENABLED_WORKFLOW='deploy_rag_workflow_with_retry_source_query_engine'

DEFAULT_KAFKA_URL = "localhost:9092"
WORKFLOW_HOST='127.0.0.1'
WORKFLOW_PORT=8004
WORKFLOW_SERVICE_NAME='rag_workflow_with_retry_guideline_query_engine'
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from llama_deploy import (
deploy_core,
ControlPlaneConfig
)
from llama_deploy.message_queues.apache_kafka import KafkaMessageQueueConfig
from dotenv import load_dotenv, find_dotenv
import os


async def main():
_ = load_dotenv(find_dotenv())

await deploy_core(
control_plane_config=ControlPlaneConfig(),
message_queue_config=KafkaMessageQueueConfig(url=os.environ.get('DEFAULT_KAFKA_URL')),
)


if __name__ == "__main__":
import asyncio

asyncio.run(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from llama_deploy import (
deploy_workflow,
WorkflowServiceConfig,
ControlPlaneConfig
)
from retry_guideline_query_engine_workflow import build_rag_workflow_with_retry_guideline_query_engine
from dotenv import load_dotenv, find_dotenv


_ = load_dotenv(find_dotenv())


async def deploy_rag_workflow_with_retry_guideline_query_engine():
rag_workflow = build_rag_workflow_with_retry_guideline_query_engine()
try:
await deploy_workflow(
workflow=rag_workflow,
workflow_config=WorkflowServiceConfig(
host="127.0.0.1",
port=8004,
# service name matches the name of the workflow used in Agentic Workflow
service_name="rag_workflow_with_retry_guideline_query_engine",
description="RAG workflow",
),
# Config controlled by env vars
control_plane_config=ControlPlaneConfig()
)
except Exception as e:
print(e)

if __name__ == "__main__":
import asyncio
import nest_asyncio

nest_asyncio.apply()
try:
asyncio.run(deploy_rag_workflow_with_retry_guideline_query_engine())
except Exception as e:
print(e)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from llama_index.core.workflow import Event
from llama_index. core. base. base_query_engine import BaseQueryEngine


class QueryEngineEvent(Event):
"""Result of running retrieval"""

base_query_engine: BaseQueryEngine

Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
## llama-agents
## llama-workflows with llama-deploy

### install qdrant
- `docker pull qdrant/qdrant`
- `docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage:z \
qdrant/qdrant`

### install ollama
- navigate to [https://ollama.com/download](https://ollama.com/download)
### install Required Software (Ollama and Qdrant)
- follow the documentation from the root folder

### how to run llama-agents
- open `.env`
- change the `DB_URL`, `DB_API_KEY` and `COLLECTION_NAME` according to you
- point the right right LLMs (if not local)
- point the right LLMs (if not local)
- `pip install -r requirements.txt`
- `python deploy_code.py`
- `python deploy_workflow.py`
- `python main.py`

Original file line number Diff line number Diff line change
@@ -4,4 +4,6 @@ llama-index-vector-stores-qdrant==0.3.0
llama-index-llms-ollama==0.3.1
llama-index-embeddings-fastembed==0.2.0
llama-index-embeddings-ollama==0.3.0
llama-index-postprocessor-rankgpt-rerank == 0.2.0
llama-index-postprocessor-rankgpt-rerank == 0.2.0
aiokafka==0.11.0
kafka-python-ng==2.2.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os
import logging
import qdrant_client
from llama_index.core.workflow import (
Workflow,
Context,
StartEvent,
StopEvent,
step
)
from llama_index.core.base.base_query_engine import BaseQueryEngine
from llama_index.core.query_engine import RetryGuidelineQueryEngine
from llama_index.core import (VectorStoreIndex, Settings, StorageContext, SimpleDirectoryReader)
from llama_index.core.evaluation import GuidelineEvaluator
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.evaluation.guideline import DEFAULT_GUIDELINES
from dotenv import load_dotenv, find_dotenv
from events import QueryEngineEvent

_ = load_dotenv(find_dotenv())

logging.basicConfig(level=int(os.environ['INFO']))
logger = logging.getLogger(__name__)


class RAGWorkflowWithRetryGuidelineQueryEngine(Workflow):
def __init__(self, index: VectorStoreIndex, *args, **kwargs):
super().__init__(*args, **kwargs)
self.index: VectorStoreIndex = index

@step
async def create_retry_query_engine(self, ctx: Context, ev: StartEvent) -> QueryEngineEvent | None:
"Entry point for RAG, triggered by a StartEvent with `query`."
logger.info(f"creating query engine for query: {ev.get('query')}")
query = ev.get("query")
no_of_retries = ev.get("no_of_retries", default=3)

if not query:
raise ValueError("Query is required!")

# store the settings in the global context
await ctx.set("query", query)
await ctx.set("no_of_retries", no_of_retries)

base_query_engine = self.index.as_query_engine(llm=Settings.llm, similarity_top_k=2, sparse_top_k=12,
vector_store_query_mode="hybrid")
return QueryEngineEvent(base_query_engine=base_query_engine)

@step
async def query_with_retry_source_query_engine(self, ctx: Context, ev: QueryEngineEvent) -> StopEvent:
"""Return a response using reranked nodes."""
query = await ctx.get("query")
no_of_retries = await ctx.get("no_of_retries")
base_query_engine: BaseQueryEngine = ev.base_query_engine

# Guideline eval
guideline_eval = GuidelineEvaluator(
guidelines=DEFAULT_GUIDELINES + "\nThe response should not be overly long.\n"
"The response should try to summarize where possible.\n"
) # just for example
retry_guideline_query_engine = RetryGuidelineQueryEngine(base_query_engine, guideline_eval,
resynthesize_query=True, max_retries=no_of_retries)
retry_guideline_response = retry_guideline_query_engine.query(query)
logger.info(f"response for query is: {retry_guideline_response}")
return StopEvent(result=str(retry_guideline_response))


def build_rag_workflow_with_retry_guideline_query_engine() -> RAGWorkflowWithRetryGuidelineQueryEngine:
index_loaded = False
# host points to qdrant in docker-compose.yml
client = qdrant_client.QdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY'])
aclient = qdrant_client.AsyncQdrantClient(url=os.environ['DB_URL'], api_key=os.environ['DB_API_KEY'])
vector_store = QdrantVectorStore(collection_name=os.environ['COLLECTION_NAME'], client=client, aclient=aclient,
enable_hybrid=True, batch_size=50)

Settings.llm = Ollama(model=os.environ['OLLAMA_LLM_MODEL'], base_url=os.environ['OLLAMA_BASE_URL'],
request_timeout=600)
Settings.embed_model = OllamaEmbedding(model_name=os.environ['OLLAMA_EMBED_MODEL'],
base_url=os.environ['OLLAMA_BASE_URL'])

# index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model)
index: VectorStoreIndex = None

if client.collection_exists(collection_name=os.environ['COLLECTION_NAME']):
try:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
index_loaded = True
except Exception as e:
index_loaded = False

if not index_loaded:
# load data
_docs = (SimpleDirectoryReader(input_dir='data', required_exts=['.pdf']).load_data(show_progress=True))

# build and persist index
storage_context = StorageContext.from_defaults(vector_store=vector_store)
logger.info("indexing the docs in VectorStoreIndex")
index = VectorStoreIndex.from_documents(documents=_docs, storage_context=storage_context, show_progress=True)

return RAGWorkflowWithRetryGuidelineQueryEngine(index=index, timeout=120.0)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

setup(
name='bootstrap-rag',
version='0.0.2',
version='0.0.3',
packages=find_packages(),
include_package_data=True,
install_requires=[