Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenSearch VectorStore Component with Ingest and Search Capabilities #3799

Merged
merged 8 commits into from
Oct 1, 2024
Merged
91 changes: 35 additions & 56 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ langchain-unstructured = "^0.1.2"
pydantic-settings = "2.4.0"
ragstack-ai-knowledge-store = "^0.2.1"
duckduckgo-search = "^6.2.12"
opensearch-py = "^2.7.1"


[tool.poetry.group.dev.dependencies]
Expand Down
268 changes: 268 additions & 0 deletions src/backend/base/langflow/components/vectorstores/OpenSearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import json
import traceback
from typing import TYPE_CHECKING, Any

from langchain_community.vectorstores import OpenSearchVectorSearch
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data

if TYPE_CHECKING:
from langchain_community.vectorstores import OpenSearchVectorSearch


class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
"""
OpenSearch Vector Store with advanced, customizable search capabilities.
"""

display_name: str = "OpenSearch"
description: str = "OpenSearch Vector Store with advanced, customizable search capabilities."
documentation = "https://python.langchain.com/docs/integrations/vectorstores/opensearch"
name = "OpenSearch"
icon = "OpenSearch"

inputs = [
StrInput(
name="opensearch_url",
display_name="OpenSearch URL",
value="http://localhost:9200",
info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).",
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info="The index name where the vectors will be stored in OpenSearch cluster.",
),
MultilineInput(
name="search_input",
display_name="Search Input",
info=(
"Enter a search query. Leave empty to retrieve all documents. "
"If you need a more advanced search consider using Hybrid Search Query instead."
),
value="",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["similarity", "similarity_score_threshold", "mmr"],
value="similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results.",
value=0.0,
advanced=True,
),
StrInput(
name="username",
display_name="Username",
value="admin",
advanced=True,
),
SecretStrInput(
name="password",
display_name="Password",
value="admin",
advanced=True,
),
BoolInput(
name="use_ssl",
display_name="Use SSL",
value=True,
advanced=True,
),
BoolInput(
name="verify_certs",
display_name="Verify Certificates",
value=False,
advanced=True,
),
MultilineInput(
name="hybrid_search_query",
display_name="Hybrid Search Query",
value="",
advanced=True,
info=(
"Provide a custom hybrid search query in JSON format. This allows you to combine "
"vector similarity and keyword matching."
),
),
]

@check_cached_vector_store
def build_vector_store(self) -> OpenSearchVectorSearch:
"""
Builds the OpenSearch Vector Store object.
"""
try:
from langchain_community.vectorstores import OpenSearchVectorSearch
except ImportError as e:
error_message = f"Failed to import required modules: {str(e)}"
logger.error(error_message)
raise ImportError(error_message) from e

try:
opensearch = OpenSearchVectorSearch(
index_name=self.index_name,
embedding_function=self.embedding,
opensearch_url=self.opensearch_url,
http_auth=(self.username, self.password),
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
except Exception as e:
error_message = f"Failed to create OpenSearchVectorSearch instance: {str(e)}"
logger.error(error_message)
raise RuntimeError(error_message) from e

if self.ingest_data:
self._add_documents_to_vector_store(opensearch)

return opensearch

def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
"""
Adds documents to the Vector Store.
"""
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
error_message = f"Expected Data object, got {type(_input)}"
logger.error(error_message)
raise ValueError(error_message)

if documents and self.embedding is not None:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
error_message = f"Error adding documents to Vector Store: {str(e)}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
raise RuntimeError(error_message) from e
else:
logger.debug("No documents to add to the Vector Store.")

def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""
Search for similar documents in the vector store or retrieve all documents if no query is provided.
"""
try:
vector_store = self.build_vector_store()

query = query or ""

if self.hybrid_search_query.strip():
try:
hybrid_query = json.loads(self.hybrid_search_query)
except json.JSONDecodeError as e:
error_message = f"Invalid hybrid search query JSON: {str(e)}"
logger.error(error_message)
raise ValueError(error_message) from e

results = vector_store.client.search(index=self.index_name, body=hybrid_query)

processed_results = []
for hit in results.get("hits", {}).get("hits", []):
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
source = hit.get("_source", {})
text = source.get("text", "")
metadata = source.get("metadata", {})

if isinstance(text, dict):
text = text.get("text", "")

processed_results.append(
{
"page_content": text,
"metadata": metadata,
}
)
return processed_results

search_kwargs = {"k": self.number_of_results}
search_type = self.search_type.lower()

if search_type == "similarity":
results = vector_store.similarity_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
if search_type == "similarity_score_threshold":
search_kwargs["score_threshold"] = self.search_score_threshold
results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)
return [
{
"page_content": doc.page_content,
"metadata": doc.metadata,
"score": score,
}
for doc, score in results
]
if search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]

error_message = f"Invalid search type:: {self.search_type}"
logger.error(error_message)
raise ValueError(error_message)

except Exception as e:
error_message = f"Error during search: {str(e)}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
raise RuntimeError(error_message) from e

def search_documents(self) -> list[Data]:
"""
Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
try:
query = self.search_input.strip() if self.search_input else None
results = self.search(query)
retrieved_data = [
Data(
file_path=result["metadata"].get("file_path", ""),
text=result["page_content"],
)
for result in results
]
self.status = retrieved_data
return retrieved_data
except Exception as e:
error_message = f"Error during document search: {str(e)}"
logger.error(error_message)
logger.error(f"Traceback: {traceback.format_exc()}")
raise RuntimeError(error_message) from e
Loading