From 61816e076ffb9cee2974a47a28911102290f0d6b Mon Sep 17 00:00:00 2001 From: Nathan Evans Date: Tue, 10 Dec 2024 12:23:26 -0800 Subject: [PATCH] Migration notebook (#1492) * Add migration notebook * Update migration instructions * Semver * Rename item in relationships table * Remove indexing vector store shim * Remove query shims * Remove columns from migrated data * Format * Add community parents --- .../patch-20241209225934573225.json | 4 + docs/examples_notebooks/drift_search.ipynb | 39 +-- docs/examples_notebooks/index_migration.ipynb | 263 ++++++++++++++++++ docs/examples_notebooks/local_search.ipynb | 6 - .../graph-visualization.ipynb | 6 - graphrag/api/index.py | 24 -- graphrag/api/query.py | 113 +------- graphrag/cli/query.py | 7 +- graphrag/config/models/llm_parameters.py | 3 - graphrag/config/resolve_path.py | 9 + graphrag/index/input/factory.py | 4 - graphrag/index/run/run.py | 4 - graphrag/index/run/workflow.py | 4 +- graphrag/index/update/incremental_index.py | 20 +- graphrag/model/community_report.py | 7 - graphrag/model/document.py | 5 - graphrag/model/text_unit.py | 5 - graphrag/query/indexer_adapters.py | 26 +- graphrag/query/input/loaders/dfs.py | 51 ---- graphrag/utils/storage.py | 10 +- v1-breaking-changes.md | 24 +- 21 files changed, 310 insertions(+), 324 deletions(-) create mode 100644 .semversioner/next-release/patch-20241209225934573225.json create mode 100644 docs/examples_notebooks/index_migration.ipynb diff --git a/.semversioner/next-release/patch-20241209225934573225.json b/.semversioner/next-release/patch-20241209225934573225.json new file mode 100644 index 000000000..45bde5edb --- /dev/null +++ b/.semversioner/next-release/patch-20241209225934573225.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Add migration notebook." +} diff --git a/docs/examples_notebooks/drift_search.ipynb b/docs/examples_notebooks/drift_search.ipynb index c1b06a564..95e0115e9 100644 --- a/docs/examples_notebooks/drift_search.ipynb +++ b/docs/examples_notebooks/drift_search.ipynb @@ -171,9 +171,6 @@ " read_indexer_reports,\n", " read_indexer_text_units,\n", ")\n", - "from graphrag.query.input.loaders.dfs import (\n", - " store_entity_semantic_embeddings,\n", - ")\n", "from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n", "from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n", "from graphrag.query.llm.oai.typing import OpenaiApiType\n", @@ -207,9 +204,6 @@ " collection_name=\"default-entity-description\",\n", ")\n", "description_embedding_store.connect(db_uri=LANCEDB_URI)\n", - "entity_description_embeddings = store_entity_semantic_embeddings(\n", - " entities=entities, vectorstore=description_embedding_store\n", - ")\n", "\n", "print(f\"Entity count: {len(entity_df)}\")\n", "entity_df.head()\n", @@ -270,37 +264,16 @@ } ], "source": [ - "def embed_community_reports(\n", + "def read_community_reports(\n", " input_dir: str,\n", - " embedder: OpenAIEmbedding,\n", " community_report_table: str = COMMUNITY_REPORT_TABLE,\n", "):\n", " \"\"\"Embeds the full content of the community reports and saves the DataFrame with embeddings to the output path.\"\"\"\n", " input_path = Path(input_dir) / f\"{community_report_table}.parquet\"\n", - " output_path = Path(input_dir) / f\"{community_report_table}_with_embeddings.parquet\"\n", - "\n", - " if not Path(output_path).exists():\n", - " print(\"Embedding file not found. Computing community report embeddings...\")\n", - "\n", - " report_df = pd.read_parquet(input_path)\n", - "\n", - " if \"full_content\" not in report_df.columns:\n", - " error_msg = f\"'full_content' column not found in {input_path}\"\n", - " raise ValueError(error_msg)\n", - "\n", - " report_df[\"full_content_embeddings\"] = report_df.loc[:, \"full_content\"].apply(\n", - " lambda x: embedder.embed(x)\n", - " )\n", - "\n", - " # Save the DataFrame with embeddings to the output path\n", - " report_df.to_parquet(output_path)\n", - " print(f\"Embeddings saved to {output_path}\")\n", - " return report_df\n", - " print(f\"Embeddings file already exists at {output_path}\")\n", - " return pd.read_parquet(output_path)\n", + " return pd.read_parquet(input_path)\n", "\n", "\n", - "report_df = embed_community_reports(INPUT_DIR, text_embedder)\n", + "report_df = read_community_reports(INPUT_DIR)\n", "reports = read_indexer_reports(\n", " report_df,\n", " entity_df,\n", @@ -321,7 +294,7 @@ " entities=entities,\n", " relationships=relationships,\n", " reports=reports,\n", - " entity_text_embeddings=entity_description_embeddings,\n", + " entity_text_embeddings=description_embedding_store,\n", " text_units=text_units,\n", ")\n", "\n", @@ -3172,7 +3145,7 @@ ], "metadata": { "kernelspec": { - "display_name": "graphrag-ta_-cxM1-py3.10", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -3186,7 +3159,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.11.9" } }, "nbformat": 4, diff --git a/docs/examples_notebooks/index_migration.ipynb b/docs/examples_notebooks/index_migration.ipynb new file mode 100644 index 000000000..a0ba6ae47 --- /dev/null +++ b/docs/examples_notebooks/index_migration.ipynb @@ -0,0 +1,263 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright (c) 2024 Microsoft Corporation.\n", + "# Licensed under the MIT License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Index Migration\n", + "\n", + "This notebook is used to maintain data model parity with older indexes for the latest versions of GraphRAG. If you have a pre-1.0 index and need to migrate without re-running the entire pipeline, you can use this notebook to only update the pieces necessary for alignment.\n", + "\n", + "NOTE: we recommend regenerating your settings.yml with the latest version of GraphRAG using `graphrag init`. Copy your LLM settings into it before running this notebook. This ensures your config is aligned with the latest version for the migration. This also ensures that you have default vector store config, which is now required or indexing will fail.\n", + "\n", + "WARNING: This will overwrite your parquet files, you may want to make a backup!" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# This is the directory that has your settings.yml\n", + "# NOTE: much older indexes may have been output with a timestamped directory\n", + "# if this is the case, you will need to make sure the storage.base_dir in settings.yml points to it correctly\n", + "PROJECT_DIRECTORY = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from graphrag.config.load_config import load_config\n", + "from graphrag.config.resolve_path import resolve_paths\n", + "from graphrag.index.create_pipeline_config import create_pipeline_config\n", + "from graphrag.storage.factory import create_storage\n", + "\n", + "# This first block does some config loading, path resolution, and translation that is normally done by the CLI/API when running a full workflow\n", + "config = load_config(Path(PROJECT_DIRECTORY))\n", + "resolve_paths(config)\n", + "pipeline_config = create_pipeline_config(config)\n", + "storage = create_storage(pipeline_config.storage)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "def remove_columns(df, columns):\n", + " \"\"\"Remove columns from a DataFrame, suppressing errors.\"\"\"\n", + " df.drop(labels=columns, axis=1, errors=\"ignore\", inplace=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "metadata": {}, + "outputs": [], + "source": [ + "def get_community_parent(nodes):\n", + " \"\"\"Compute the parent community using the node membership as a lookup.\"\"\"\n", + " parent_mapping = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n", + " nodes = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n", + "\n", + " # Create a parent mapping by adding 1 to the level column\n", + " parent_mapping[\"level\"] += 1 # Shift levels for parent relationship\n", + " parent_mapping.rename(columns={\"community\": \"parent\"}, inplace=True)\n", + "\n", + " # Merge the parent information back into the base DataFrame\n", + " nodes = nodes.merge(parent_mapping, on=[\"level\", \"title\"], how=\"left\")\n", + "\n", + " # Fill missing parents with -1 (default value)\n", + " nodes[\"parent\"] = nodes[\"parent\"].fillna(-1).astype(int)\n", + "\n", + " join = (\n", + " nodes.groupby([\"community\", \"level\", \"parent\"])\n", + " .agg({\"title\": list})\n", + " .reset_index()\n", + " )\n", + " return join[join[\"community\"] > -1].loc[:, [\"community\", \"parent\"]]" + ] + }, + { + "cell_type": "code", + "execution_count": 64, + "metadata": {}, + "outputs": [], + "source": [ + "from uuid import uuid4\n", + "\n", + "from graphrag.utils.storage import load_table_from_storage, write_table_to_storage\n", + "\n", + "# First we'll go through any parquet files that had model changes and update them\n", + "# The new data model may have removed excess columns as well, but we will only make the minimal changes required for compatibility\n", + "\n", + "final_documents = await load_table_from_storage(\n", + " \"create_final_documents.parquet\", storage\n", + ")\n", + "final_text_units = await load_table_from_storage(\n", + " \"create_final_text_units.parquet\", storage\n", + ")\n", + "final_entities = await load_table_from_storage(\"create_final_entities.parquet\", storage)\n", + "final_nodes = await load_table_from_storage(\"create_final_nodes.parquet\", storage)\n", + "final_relationships = await load_table_from_storage(\n", + " \"create_final_relationships.parquet\", storage\n", + ")\n", + "final_communities = await load_table_from_storage(\n", + " \"create_final_communities.parquet\", storage\n", + ")\n", + "final_community_reports = await load_table_from_storage(\n", + " \"create_final_community_reports.parquet\", storage\n", + ")\n", + "\n", + "\n", + "# Documents renames raw_content for consistency\n", + "if \"raw_content\" in final_documents.columns:\n", + " final_documents.rename(columns={\"raw_content\": \"text\"}, inplace=True)\n", + "final_documents[\"human_readable_id\"] = final_documents.index + 1\n", + "\n", + "# Text units just get a human_readable_id or consistency\n", + "final_text_units[\"human_readable_id\"] = final_text_units.index + 1\n", + "\n", + "# We renamed \"name\" to \"title\" for consistency with the rest of the tables\n", + "if \"name\" in final_entities.columns:\n", + " final_entities.rename(columns={\"name\": \"title\"}, inplace=True)\n", + "remove_columns(\n", + " final_entities, [\"mname_embedding\", \"graph_embedding\", \"description_embedding\"]\n", + ")\n", + "\n", + "# Final nodes uses community for joins, which is now an int everywhere\n", + "final_nodes[\"community\"] = final_nodes[\"community\"].fillna(-1)\n", + "final_nodes[\"community\"] = final_nodes[\"community\"].astype(int)\n", + "remove_columns(\n", + " final_nodes,\n", + " [\n", + " \"type\",\n", + " \"description\",\n", + " \"source_id\",\n", + " \"graph_embedding\",\n", + " \"entity_type\",\n", + " \"top_level_node_id\",\n", + " \"size\",\n", + " ],\n", + ")\n", + "\n", + "# Relationships renames \"rank\" to \"combined_degree\" to be clear what the default ranking is\n", + "if \"rank\" in final_relationships.columns:\n", + " final_relationships.rename(columns={\"rank\": \"combined_degree\"}, inplace=True)\n", + "\n", + "\n", + "# Compute the parents for each community, to add to communities and reports\n", + "parent_df = get_community_parent(final_nodes)\n", + "\n", + "# Communities previously used the \"id\" field for the Leiden id, but we've moved this to the community field and use a uuid for id like the others\n", + "if \"community\" not in final_communities.columns:\n", + " final_communities[\"community\"] = final_communities[\"id\"].astype(int)\n", + " final_communities[\"human_readable_id\"] = final_communities[\"community\"]\n", + " final_communities[\"id\"] = [str(uuid4()) for _ in range(len(final_communities))]\n", + "if \"parent\" not in final_communities.columns:\n", + " final_communities = final_communities.merge(parent_df, on=\"community\", how=\"left\")\n", + "remove_columns(final_communities, [\"raw_community\"])\n", + "\n", + "# We need int for community and the human_readable_id copy for consistency\n", + "final_community_reports[\"community\"] = final_community_reports[\"community\"].astype(int)\n", + "final_community_reports[\"human_readable_id\"] = final_community_reports[\"community\"]\n", + "if \"parent\" not in final_community_reports.columns:\n", + " final_community_reports = final_community_reports.merge(\n", + " parent_df, on=\"community\", how=\"left\"\n", + " )\n", + "\n", + "await write_table_to_storage(final_documents, \"create_final_documents.parquet\", storage)\n", + "await write_table_to_storage(\n", + " final_text_units, \"create_final_text_units.parquet\", storage\n", + ")\n", + "await write_table_to_storage(final_entities, \"create_final_entities.parquet\", storage)\n", + "await write_table_to_storage(final_nodes, \"create_final_nodes.parquet\", storage)\n", + "await write_table_to_storage(\n", + " final_relationships, \"create_final_relationships.parquet\", storage\n", + ")\n", + "await write_table_to_storage(\n", + " final_communities, \"create_final_communities.parquet\", storage\n", + ")\n", + "await write_table_to_storage(\n", + " final_community_reports, \"create_final_community_reports.parquet\", storage\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "from datashaper import NoopVerbCallbacks\n", + "\n", + "from graphrag.cache.factory import create_cache\n", + "from graphrag.index.flows.generate_text_embeddings import generate_text_embeddings\n", + "\n", + "# We only need to re-run the embeddings workflow, to ensure that embeddings for all required search fields are in place\n", + "# We'll construct the context and run this function flow directly to avoid everything else\n", + "\n", + "workflow = next(\n", + " (x for x in pipeline_config.workflows if x.name == \"generate_text_embeddings\"), None\n", + ")\n", + "config = workflow.config\n", + "text_embed = config.get(\"text_embed\", {})\n", + "embedded_fields = config.get(\"embedded_fields\", {})\n", + "callbacks = NoopVerbCallbacks()\n", + "cache = create_cache(pipeline_config.cache, PROJECT_DIRECTORY)\n", + "\n", + "await generate_text_embeddings(\n", + " final_documents=None,\n", + " final_relationships=None,\n", + " final_text_units=final_text_units,\n", + " final_entities=final_entities,\n", + " final_community_reports=final_community_reports,\n", + " callbacks=callbacks,\n", + " cache=cache,\n", + " storage=storage,\n", + " text_embed_config=text_embed,\n", + " embedded_fields=embedded_fields,\n", + " snapshot_embeddings_enabled=False,\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/examples_notebooks/local_search.ipynb b/docs/examples_notebooks/local_search.ipynb index 8d0e85277..ded7d1e4e 100644 --- a/docs/examples_notebooks/local_search.ipynb +++ b/docs/examples_notebooks/local_search.ipynb @@ -29,9 +29,6 @@ " read_indexer_reports,\n", " read_indexer_text_units,\n", ")\n", - "from graphrag.query.input.loaders.dfs import (\n", - " store_entity_semantic_embeddings,\n", - ")\n", "from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n", "from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n", "from graphrag.query.llm.oai.typing import OpenaiApiType\n", @@ -287,9 +284,6 @@ " collection_name=\"default-entity-description\",\n", ")\n", "description_embedding_store.connect(db_uri=LANCEDB_URI)\n", - "entity_description_embeddings = store_entity_semantic_embeddings(\n", - " entities=entities, vectorstore=description_embedding_store\n", - ")\n", "\n", "print(f\"Entity count: {len(entity_df)}\")\n", "entity_df.head()" diff --git a/examples_notebooks/community_contrib/yfiles-jupyter-graphs/graph-visualization.ipynb b/examples_notebooks/community_contrib/yfiles-jupyter-graphs/graph-visualization.ipynb index bbb696c89..d10ef948b 100644 --- a/examples_notebooks/community_contrib/yfiles-jupyter-graphs/graph-visualization.ipynb +++ b/examples_notebooks/community_contrib/yfiles-jupyter-graphs/graph-visualization.ipynb @@ -38,9 +38,6 @@ " read_indexer_reports,\n", " read_indexer_text_units,\n", ")\n", - "from graphrag.query.input.loaders.dfs import (\n", - " store_entity_semantic_embeddings,\n", - ")\n", "from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n", "from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n", "from graphrag.query.llm.oai.typing import OpenaiApiType\n", @@ -302,9 +299,6 @@ " collection_name=\"default-entity-description\",\n", ")\n", "description_embedding_store.connect(db_uri=LANCEDB_URI)\n", - "entity_description_embeddings = store_entity_semantic_embeddings(\n", - " entities=entities, vectorstore=description_embedding_store\n", - ")\n", "covariate_df = pd.read_parquet(f\"{INPUT_DIR}/{COVARIATE_TABLE}.parquet\")\n", "claims = read_indexer_covariates(covariate_df)\n", "covariates = {\"claims\": claims}\n", diff --git a/graphrag/api/index.py b/graphrag/api/index.py index 372107a18..b24e4f8dc 100644 --- a/graphrag/api/index.py +++ b/graphrag/api/index.py @@ -8,8 +8,6 @@ Backwards compatibility is not guaranteed at this time. """ -from pathlib import Path - from datashaper import WorkflowCallbacks from graphrag.cache.noop_pipeline_cache import NoopPipelineCache @@ -20,7 +18,6 @@ from graphrag.index.run import run_pipeline_with_config from graphrag.index.typing import PipelineRunResult from graphrag.logging.base import ProgressReporter -from graphrag.vector_stores.factory import VectorStoreType async def build_index( @@ -59,8 +56,6 @@ async def build_index( msg = "Cannot resume and update a run at the same time." raise ValueError(msg) - config = _patch_vector_config(config) - pipeline_config = create_pipeline_config(config) pipeline_cache = ( NoopPipelineCache() if config.cache.type == CacheType.none is None else None @@ -88,22 +83,3 @@ async def build_index( progress_reporter.success(output.workflow) progress_reporter.info(str(output.result)) return outputs - - -def _patch_vector_config(config: GraphRagConfig): - """Back-compat patch to ensure a default vector store configuration.""" - if not config.embeddings.vector_store: - config.embeddings.vector_store = { - "type": "lancedb", - "db_uri": "output/lancedb", - "container_name": "default", - "overwrite": True, - } - # TODO: must update filepath of lancedb (if used) until the new config engine has been implemented - # TODO: remove the type ignore annotations below once the new config engine has been refactored - vector_store_type = config.embeddings.vector_store["type"] # type: ignore - if vector_store_type == VectorStoreType.LanceDB: - db_uri = config.embeddings.vector_store["db_uri"] # type: ignore - lancedb_dir = Path(config.root_dir).resolve() / db_uri - config.embeddings.vector_store["db_uri"] = str(lancedb_dir) # type: ignore - return config diff --git a/graphrag/api/query.py b/graphrag/api/query.py index ebe6bd22a..1b6a92a3e 100644 --- a/graphrag/api/query.py +++ b/graphrag/api/query.py @@ -48,7 +48,7 @@ from graphrag.utils.cli import redact from graphrag.utils.embeddings import create_collection_name from graphrag.vector_stores.base import BaseVectorStore -from graphrag.vector_stores.factory import VectorStoreFactory, VectorStoreType +from graphrag.vector_stores.factory import VectorStoreFactory reporter = PrintProgressReporter("") @@ -240,17 +240,7 @@ async def local_search( ------ TODO: Document any exceptions to expect. """ - config = _patch_vector_store(config, nodes, entities, community_level) - - # TODO: update filepath of lancedb (if used) until the new config engine has been implemented - # TODO: remove the type ignore annotations below once the new config engine has been refactored - vector_store_type = config.embeddings.vector_store.get("type") # type: ignore vector_store_args = config.embeddings.vector_store - if vector_store_type == VectorStoreType.LanceDB: - db_uri = config.embeddings.vector_store["db_uri"] # type: ignore - lancedb_dir = Path(config.root_dir).resolve() / db_uri - vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore - reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore description_embedding_store = _get_embedding_store( @@ -316,17 +306,7 @@ async def local_search_streaming( ------ TODO: Document any exceptions to expect. """ - config = _patch_vector_store(config, nodes, entities, community_level) - - # TODO: must update filepath of lancedb (if used) until the new config engine has been implemented - # TODO: remove the type ignore annotations below once the new config engine has been refactored - vector_store_type = config.embeddings.vector_store.get("type") # type: ignore vector_store_args = config.embeddings.vector_store - if vector_store_type == VectorStoreType.LanceDB: - db_uri = config.embeddings.vector_store["db_uri"] # type: ignore - lancedb_dir = Path(config.root_dir).resolve() / db_uri - vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore - reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore description_embedding_store = _get_embedding_store( @@ -399,19 +379,7 @@ async def drift_search( ------ TODO: Document any exceptions to expect. """ - config = _patch_vector_store( - config, nodes, entities, community_level, with_reports=community_reports - ) - - # TODO: update filepath of lancedb (if used) until the new config engine has been implemented - # TODO: remove the type ignore annotations below once the new config engine has been refactored - vector_store_type = config.embeddings.vector_store.get("type") # type: ignore vector_store_args = config.embeddings.vector_store - if vector_store_type == VectorStoreType.LanceDB: - db_uri = config.embeddings.vector_store["db_uri"] # type: ignore - lancedb_dir = Path(config.root_dir).resolve() / db_uri - vector_store_args["db_uri"] = str(lancedb_dir) # type: ignore - reporter.info(f"Vector Store Args: {redact(vector_store_args)}") # type: ignore description_embedding_store = _get_embedding_store( @@ -453,85 +421,6 @@ async def drift_search( return response, context_data -def _patch_vector_store( - config: GraphRagConfig, - nodes: pd.DataFrame, - entities: pd.DataFrame, - community_level: int, - with_reports: pd.DataFrame | None = None, -) -> GraphRagConfig: - # TODO: remove the following patch that checks for a vector_store prior to v1 release - # TODO: this is a backwards compatibility patch that injects the default vector_store settings into the config if it is not present - # Only applicable in situations involving a local vector_store (lancedb). The general idea: - # if vector_store not in config: - # 1. assume user is running local if vector_store is not in config - # 2. insert default vector_store in config - # 3 .create lancedb vector_store instance - # 4. upload vector embeddings from the input dataframes to the vector_store - if not config.embeddings.vector_store: - from graphrag.query.input.loaders.dfs import ( - store_entity_semantic_embeddings, - ) - from graphrag.vector_stores.lancedb import LanceDBVectorStore - - config.embeddings.vector_store = { - "type": "lancedb", - "db_uri": f"{Path(config.storage.base_dir)}/lancedb", - "container_name": "default", - "overwrite": True, - } - description_embedding_store = LanceDBVectorStore( - db_uri=config.embeddings.vector_store["db_uri"], - collection_name=create_collection_name( - config.embeddings.vector_store["container_name"], - entity_description_embedding, - ), - overwrite=config.embeddings.vector_store["overwrite"], - ) - description_embedding_store.connect( - db_uri=config.embeddings.vector_store["db_uri"] - ) - # dump embeddings from the entities list to the description_embedding_store - entities_ = read_indexer_entities(nodes, entities, community_level) - store_entity_semantic_embeddings( - entities=entities_, vectorstore=description_embedding_store - ) - - if with_reports is not None: - from graphrag.query.input.loaders.dfs import ( - store_reports_semantic_embeddings, - ) - from graphrag.vector_stores.lancedb import LanceDBVectorStore - - community_reports = with_reports - container_name = config.embeddings.vector_store["container_name"] - # Store report embeddings - reports = read_indexer_reports( - community_reports, - nodes, - community_level, - content_embedding_col="full_content_embedding", - config=config, - ) - - full_content_embedding_store = LanceDBVectorStore( - db_uri=config.embeddings.vector_store["db_uri"], - collection_name=create_collection_name( - container_name, community_full_content_embedding - ), - overwrite=config.embeddings.vector_store["overwrite"], - ) - full_content_embedding_store.connect( - db_uri=config.embeddings.vector_store["db_uri"] - ) - # dump embeddings from the reports list to the full_content_embedding_store - store_reports_semantic_embeddings( - reports=reports, vectorstore=full_content_embedding_store - ) - - return config - - def _get_embedding_store( config_args: dict, embedding_name: str, diff --git a/graphrag/cli/query.py b/graphrag/cli/query.py index f11dfd60c..1bf185a76 100644 --- a/graphrag/cli/query.py +++ b/graphrag/cli/query.py @@ -16,7 +16,7 @@ from graphrag.index.create_pipeline_config import create_pipeline_config from graphrag.logging.print_progress import PrintProgressReporter from graphrag.storage.factory import create_storage -from graphrag.utils.storage import _load_table_from_storage +from graphrag.utils.storage import load_table_from_storage reporter = PrintProgressReporter("") @@ -124,7 +124,6 @@ def run_local_search( config.storage.base_dir = str(data_dir) if data_dir else config.storage.base_dir resolve_paths(config) - # TODO remove optional create_final_entities_description_embeddings.parquet to delete backwards compatibility dataframe_dict = _resolve_parquet_files( config=config, parquet_list=[ @@ -270,7 +269,7 @@ def _resolve_parquet_files( for parquet_file in parquet_list: df_key = parquet_file.split(".")[0] df_value = asyncio.run( - _load_table_from_storage(name=parquet_file, storage=storage_obj) + load_table_from_storage(name=parquet_file, storage=storage_obj) ) dataframe_dict[df_key] = df_value @@ -281,7 +280,7 @@ def _resolve_parquet_files( df_key = optional_file.split(".")[0] if file_exists: df_value = asyncio.run( - _load_table_from_storage(name=optional_file, storage=storage_obj) + load_table_from_storage(name=optional_file, storage=storage_obj) ) dataframe_dict[df_key] = df_value else: diff --git a/graphrag/config/models/llm_parameters.py b/graphrag/config/models/llm_parameters.py index 300498ae7..5cf064230 100644 --- a/graphrag/config/models/llm_parameters.py +++ b/graphrag/config/models/llm_parameters.py @@ -24,9 +24,6 @@ class LLMParameters(BaseModel): description="The encoding model to use", default=defs.ENCODING_MODEL ) model: str = Field(description="The LLM model to use.", default=defs.LLM_MODEL) - embeddings_model: str | None = Field( - description="The embeddings model to use.", default=defs.EMBEDDING_MODEL - ) max_tokens: int | None = Field( description="The maximum number of tokens to generate.", default=defs.LLM_MAX_TOKENS, diff --git a/graphrag/config/resolve_path.py b/graphrag/config/resolve_path.py index 237c7f7ed..084508103 100644 --- a/graphrag/config/resolve_path.py +++ b/graphrag/config/resolve_path.py @@ -9,6 +9,7 @@ from graphrag.config.enums import ReportingType, StorageType from graphrag.config.models.graph_rag_config import GraphRagConfig +from graphrag.vector_stores.factory import VectorStoreType def _resolve_timestamp_path_with_value(path: str | Path, timestamp_value: str) -> Path: @@ -203,3 +204,11 @@ def resolve_paths( pattern_or_timestamp_value, ) ) + + # TODO: must update filepath of lancedb (if used) until the new config engine has been implemented + # TODO: remove the type ignore annotations below once the new config engine has been refactored + vector_store_type = config.embeddings.vector_store["type"] # type: ignore + if vector_store_type == VectorStoreType.LanceDB: + db_uri = config.embeddings.vector_store["db_uri"] # type: ignore + lancedb_dir = Path(config.root_dir).resolve() / db_uri + config.embeddings.vector_store["db_uri"] = str(lancedb_dir) # type: ignore diff --git a/graphrag/index/input/factory.py b/graphrag/index/input/factory.py index ee2f0a82c..11ebc18d0 100644 --- a/graphrag/index/input/factory.py +++ b/graphrag/index/input/factory.py @@ -39,10 +39,6 @@ async def create_input( log.info("loading input from root_dir=%s", config.base_dir) progress_reporter = progress_reporter or NullProgressReporter() - if config is None: - msg = "No input specified!" - raise ValueError(msg) - match config.type: case InputType.blob: log.info("using blob storage input") diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index 577203ba2..c251c7316 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -126,10 +126,6 @@ async def run_pipeline_with_config( ) workflows = workflows or config.workflows - if dataset is None: - msg = "No dataset provided!" - raise ValueError(msg) - if is_update_run and update_index_storage: delta_dataset = await get_delta_docs(dataset, storage) diff --git a/graphrag/index/run/workflow.py b/graphrag/index/run/workflow.py index 87c918537..ff80d7723 100644 --- a/graphrag/index/run/workflow.py +++ b/graphrag/index/run/workflow.py @@ -23,7 +23,7 @@ from graphrag.index.typing import PipelineRunResult from graphrag.logging.base import ProgressReporter from graphrag.storage.pipeline_storage import PipelineStorage -from graphrag.utils.storage import _load_table_from_storage +from graphrag.utils.storage import load_table_from_storage log = logging.getLogger(__name__) @@ -41,7 +41,7 @@ async def _inject_workflow_data_dependencies( for id in deps: workflow_id = f"workflow:{id}" try: - table = await _load_table_from_storage(f"{id}.parquet", storage) + table = await load_table_from_storage(f"{id}.parquet", storage) except ValueError: # our workflows allow for transient tables, and we avoid putting those in storage # however, we need to keep the table in the dependency list for proper execution order. diff --git a/graphrag/index/update/incremental_index.py b/graphrag/index/update/incremental_index.py index 582807201..04d4d8d19 100644 --- a/graphrag/index/update/incremental_index.py +++ b/graphrag/index/update/incremental_index.py @@ -25,7 +25,7 @@ from graphrag.index.update.relationships import _update_and_merge_relationships from graphrag.logging.print_progress import ProgressReporter from graphrag.storage.pipeline_storage import PipelineStorage -from graphrag.utils.storage import _load_table_from_storage +from graphrag.utils.storage import load_table_from_storage @dataclass @@ -61,7 +61,7 @@ async def get_delta_docs( InputDelta The input delta. With new inputs and deleted inputs. """ - final_docs = await _load_table_from_storage( + final_docs = await load_table_from_storage( "create_final_documents.parquet", storage ) @@ -171,7 +171,7 @@ async def _update_community_reports( dataframe_dict, storage, update_storage, community_id_mapping ): """Update the community reports output.""" - old_community_reports = await _load_table_from_storage( + old_community_reports = await load_table_from_storage( "create_final_community_reports.parquet", storage ) delta_community_reports = dataframe_dict["create_final_community_reports"] @@ -192,7 +192,7 @@ async def _update_communities( dataframe_dict, storage, update_storage, community_id_mapping ): """Update the communities output.""" - old_communities = await _load_table_from_storage( + old_communities = await load_table_from_storage( "create_final_communities.parquet", storage ) delta_communities = dataframe_dict["create_final_communities"] @@ -207,7 +207,7 @@ async def _update_communities( async def _update_nodes(dataframe_dict, storage, update_storage, merged_entities_df): """Update the nodes output.""" - old_nodes = await _load_table_from_storage("create_final_nodes.parquet", storage) + old_nodes = await load_table_from_storage("create_final_nodes.parquet", storage) delta_nodes = dataframe_dict["create_final_nodes"] merged_nodes, community_id_mapping = _merge_and_resolve_nodes( @@ -220,7 +220,7 @@ async def _update_nodes(dataframe_dict, storage, update_storage, merged_entities async def _update_covariates(dataframe_dict, storage, update_storage): """Update the covariates output.""" - old_covariates = await _load_table_from_storage( + old_covariates = await load_table_from_storage( "create_final_covariates.parquet", storage ) delta_covariates = dataframe_dict["create_final_covariates"] @@ -235,7 +235,7 @@ async def _update_text_units( dataframe_dict, storage, update_storage, entity_id_mapping ): """Update the text units output.""" - old_text_units = await _load_table_from_storage( + old_text_units = await load_table_from_storage( "create_final_text_units.parquet", storage ) delta_text_units = dataframe_dict["create_final_text_units"] @@ -253,7 +253,7 @@ async def _update_text_units( async def _update_relationships(dataframe_dict, storage, update_storage): """Update the relationships output.""" - old_relationships = await _load_table_from_storage( + old_relationships = await load_table_from_storage( "create_final_relationships.parquet", storage ) delta_relationships = dataframe_dict["create_final_relationships"] @@ -273,7 +273,7 @@ async def _update_entities( dataframe_dict, storage, update_storage, config, cache, callbacks ): """Update Final Entities output.""" - old_entities = await _load_table_from_storage( + old_entities = await load_table_from_storage( "create_final_entities.parquet", storage ) delta_entities = dataframe_dict["create_final_entities"] @@ -310,7 +310,7 @@ async def _concat_dataframes(name, dataframe_dict, storage, update_storage): storage : PipelineStorage The storage used to store the dataframes. """ - old_df = await _load_table_from_storage(f"{name}.parquet", storage) + old_df = await load_table_from_storage(f"{name}.parquet", storage) delta_df = dataframe_dict[name] # Merge the final documents diff --git a/graphrag/model/community_report.py b/graphrag/model/community_report.py index 9216fb68d..95834e70e 100644 --- a/graphrag/model/community_report.py +++ b/graphrag/model/community_report.py @@ -25,9 +25,6 @@ class CommunityReport(Named): rank: float | None = 1.0 """Rank of the report, used for sorting (optional). Higher means more important""" - summary_embedding: list[float] | None = None - """The semantic (i.e. text) embedding of the report summary (optional).""" - full_content_embedding: list[float] | None = None """The semantic (i.e. text) embedding of the full report content (optional).""" @@ -51,8 +48,6 @@ def from_dict( summary_key: str = "summary", full_content_key: str = "full_content", rank_key: str = "rank", - summary_embedding_key: str = "summary_embedding", - full_content_embedding_key: str = "full_content_embedding", attributes_key: str = "attributes", size_key: str = "size", period_key: str = "period", @@ -66,8 +61,6 @@ def from_dict( summary=d[summary_key], full_content=d[full_content_key], rank=d[rank_key], - summary_embedding=d.get(summary_embedding_key), - full_content_embedding=d.get(full_content_embedding_key), attributes=d.get(attributes_key), size=d.get(size_key), period=d.get(period_key), diff --git a/graphrag/model/document.py b/graphrag/model/document.py index ec2b2d452..57b77c11c 100644 --- a/graphrag/model/document.py +++ b/graphrag/model/document.py @@ -22,9 +22,6 @@ class Document(Named): text: str = "" """The raw text content of the document.""" - text_embedding: list[float] | None = None - """The semantic embedding for the document raw content (optional).""" - attributes: dict[str, Any] | None = None """A dictionary of structured attributes such as author, etc (optional).""" @@ -37,7 +34,6 @@ def from_dict( title_key: str = "title", type_key: str = "type", text_key: str = "text", - text_embedding_key: str = "text_embedding", text_units_key: str = "text_units", attributes_key: str = "attributes", ) -> "Document": @@ -48,7 +44,6 @@ def from_dict( title=d[title_key], type=d.get(type_key, "text"), text=d[text_key], - text_embedding=d.get(text_embedding_key), text_unit_ids=d.get(text_units_key, []), attributes=d.get(attributes_key), ) diff --git a/graphrag/model/text_unit.py b/graphrag/model/text_unit.py index 4ad3b9e8d..e649786a6 100644 --- a/graphrag/model/text_unit.py +++ b/graphrag/model/text_unit.py @@ -16,9 +16,6 @@ class TextUnit(Identified): text: str """The text of the unit.""" - text_embedding: list[float] | None = None - """The text embedding for the text unit (optional).""" - entity_ids: list[str] | None = None """List of entity IDs related to the text unit (optional).""" @@ -44,7 +41,6 @@ def from_dict( id_key: str = "id", short_id_key: str = "human_readable_id", text_key: str = "text", - text_embedding_key: str = "text_embedding", entities_key: str = "entity_ids", relationships_key: str = "relationship_ids", covariates_key: str = "covariate_ids", @@ -57,7 +53,6 @@ def from_dict( id=d[id_key], short_id=d.get(short_id_key), text=d[text_key], - text_embedding=d.get(text_embedding_key), entity_ids=d.get(entities_key), relationship_ids=d.get(relationships_key), covariate_ids=d.get(covariates_key), diff --git a/graphrag/query/indexer_adapters.py b/graphrag/query/indexer_adapters.py index 80d9c8a7d..0a8ff4d48 100644 --- a/graphrag/query/indexer_adapters.py +++ b/graphrag/query/indexer_adapters.py @@ -63,17 +63,10 @@ def read_indexer_covariates(final_covariates: pd.DataFrame) -> list[Covariate]: def read_indexer_relationships(final_relationships: pd.DataFrame) -> list[Relationship]: """Read in the Relationships from the raw indexing outputs.""" - # rank is for back-compat with older indexes - # TODO: remove for 1.0 - rank_col = ( - "combined_degree" - if "combined_degree" in final_relationships.columns - else "rank" - ) return read_relationships( df=final_relationships, short_id_col="human_readable_id", - rank_col=rank_col, + rank_col="combined_degree", description_embedding_col=None, attributes_cols=None, ) @@ -106,10 +99,6 @@ def read_indexer_reports( nodes_df = nodes_df.groupby(["title"]).agg({"community": "max"}).reset_index() filtered_community_df = nodes_df["community"].drop_duplicates() - # todo: pre 1.0 back-compat where community was a string - reports_df.loc[:, "community"] = reports_df["community"].fillna(-1) - reports_df.loc[:, "community"] = reports_df["community"].astype(int) - reports_df = reports_df.merge( filtered_community_df, on="community", how="inner" ) @@ -127,7 +116,6 @@ def read_indexer_reports( df=reports_df, id_col="id", short_id_col="community", - summary_embedding_col=None, content_embedding_col=content_embedding_col, ) @@ -155,10 +143,6 @@ def read_indexer_entities( nodes_df = cast("pd.DataFrame", nodes_df[["id", "degree", "community"]]) - nodes_df["community"] = nodes_df["community"].fillna(-1) - nodes_df["community"] = nodes_df["community"].astype(int) - nodes_df["degree"] = nodes_df["degree"].astype(int) - # group entities by id and degree and remove duplicated community IDs nodes_df = nodes_df.groupby(["id", "degree"]).agg({"community": set}).reset_index() nodes_df["community"] = nodes_df["community"].apply(lambda x: [str(i) for i in x]) @@ -166,10 +150,6 @@ def read_indexer_entities( subset=["id"] ) - # todo: pre 1.0 back-compat where title was name - if "title" not in final_df.columns: - final_df["title"] = final_df["name"] - # read entity dataframe to knowledge model objects return read_entities( df=final_df, @@ -199,10 +179,6 @@ def read_indexer_communities( nodes_df = final_nodes reports_df = final_community_reports - # todo: pre 1.0 back-compat! - if "community" not in communities_df.columns: - communities_df["community"] = communities_df["id"] - # ensure communities matches community reports missing_reports = communities_df[ ~communities_df.community.isin(reports_df.community.unique()) diff --git a/graphrag/query/input/loaders/dfs.py b/graphrag/query/input/loaders/dfs.py index 55af4c2d3..097a62895 100644 --- a/graphrag/query/input/loaders/dfs.py +++ b/graphrag/query/input/loaders/dfs.py @@ -19,7 +19,6 @@ to_optional_str, to_str, ) -from graphrag.vector_stores.base import BaseVectorStore, VectorStoreDocument def read_entities( @@ -62,50 +61,6 @@ def read_entities( return entities -def store_entity_semantic_embeddings( - entities: list[Entity], - vectorstore: BaseVectorStore, -) -> BaseVectorStore: - """Store entity semantic embeddings in a vectorstore.""" - documents = [ - VectorStoreDocument( - id=entity.id, - text=entity.description, - vector=entity.description_embedding, - attributes=( - {"title": entity.title, **entity.attributes} - if entity.attributes - else {"title": entity.title} - ), - ) - for entity in entities - ] - vectorstore.load_documents(documents=documents) - return vectorstore - - -def store_reports_semantic_embeddings( - reports: list[CommunityReport], - vectorstore: BaseVectorStore, -) -> BaseVectorStore: - """Store entity semantic embeddings in a vectorstore.""" - documents = [ - VectorStoreDocument( - id=report.id, - text=report.full_content, - vector=report.full_content_embedding, - attributes=( - {"title": report.title, **report.attributes} - if report.attributes - else {"title": report.title} - ), - ) - for report in reports - ] - vectorstore.load_documents(documents=documents) - return vectorstore - - def read_relationships( df: pd.DataFrame, id_col: str = "id", @@ -219,7 +174,6 @@ def read_community_reports( summary_col: str = "summary", content_col: str = "full_content", rank_col: str | None = "rank", - summary_embedding_col: str | None = "summary_embedding", content_embedding_col: str | None = "full_content_embedding", attributes_cols: list[str] | None = None, ) -> list[CommunityReport]: @@ -234,9 +188,6 @@ def read_community_reports( summary=to_str(row, summary_col), full_content=to_str(row, content_col), rank=to_optional_float(row, rank_col), - summary_embedding=to_optional_list( - row, summary_embedding_col, item_type=float - ), full_content_embedding=to_optional_list( row, content_embedding_col, item_type=float ), @@ -259,7 +210,6 @@ def read_text_units( covariates_col: str | None = "covariate_ids", tokens_col: str | None = "n_tokens", document_ids_col: str | None = "document_ids", - embedding_col: str | None = "text_embedding", attributes_cols: list[str] | None = None, ) -> list[TextUnit]: """Read text units from a dataframe.""" @@ -274,7 +224,6 @@ def read_text_units( covariate_ids=to_optional_dict( row, covariates_col, key_type=str, value_type=str ), - text_embedding=to_optional_list(row, embedding_col, item_type=float), # type: ignore n_tokens=to_optional_int(row, tokens_col), document_ids=to_optional_list(row, document_ids_col, item_type=str), attributes=( diff --git a/graphrag/utils/storage.py b/graphrag/utils/storage.py index ed2d5b1cb..9c8839b33 100644 --- a/graphrag/utils/storage.py +++ b/graphrag/utils/storage.py @@ -13,7 +13,8 @@ log = logging.getLogger(__name__) -async def _load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame: +async def load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame: + """Load a parquet from the storage instance.""" if not await storage.has(name): msg = f"Could not find {name} in storage!" raise ValueError(msg) @@ -23,3 +24,10 @@ async def _load_table_from_storage(name: str, storage: PipelineStorage) -> pd.Da except Exception: log.exception("error loading table from storage: %s", name) raise + + +async def write_table_to_storage( + table: pd.DataFrame, name: str, storage: PipelineStorage +) -> None: + """Write a table to storage.""" + await storage.set(name, table.to_parquet()) diff --git a/v1-breaking-changes.md b/v1-breaking-changes.md index 1905980cc..a4a3a2012 100644 --- a/v1-breaking-changes.md +++ b/v1-breaking-changes.md @@ -1,36 +1,16 @@ # GraphRAG Data Model and Config Breaking Changes -As we worked toward a cleaner codebase, data model, and configuration for the v1 release, we made a few changes that can break older indexes. During the development process we left shims in place to account for these changes, so that all old indexes will work up until v1.0. However, with the release of 1.0 we are removing these shims to allow the codebase to move forward without the legacy code elements. This should be a fairly painless process for most users: because we aggressively use a cache for LLM calls, re-running an index over the top of a previous one should be very low (or no) cost. Therefore, our standard migration recommendation is as follows: +As we worked toward a cleaner codebase, data model, and configuration for the v1 release, we made a few changes that can break older indexes. During the development process we left shims in place to account for these changes, so that all old indexes will work up until v1.0. However, with the release of 1.0 we are removing these shims to allow the codebase to move forward without the legacy code elements. We are providing a migration notebook so this process should be fairly painless for most users: 1. Rename or move your settings.yml file to back it up. 2. Re-run `graphrag init` to generate a new default settings.yml. 3. Open your old settings.yml and copy any critical settings that you changed. For most people this is likely only the LLM and embedding config. -4. Re-run `graphrag index`. This will re-execute the standard pipeline, using the cache for any LLM calls that it can. The output parquet tables will be in the latest format. +4. Run the notebook here: [./docs/examples_notebooks/index_migration.ipynb]() Note that one of the new requirements is that we write embeddings to a vector store during indexing. By default, this uses a local lancedb instance. When you re-generate the default config, a block will be added to reflect this. If you need to write to Azure AI Search instead, we recommend updating these settings before you index, so you don't need to do a separate vector ingest. All of the breaking changes listed below are accounted for in the four steps above. -## What if I don't have a cache available? - -If you no longer have your original GraphRAG cache, you can manually update your index. The most important aspect is ensuring that you have the required embeddings stored. If you already have your embeddings in a vector store, much of this can be avoided. - -Parquet changes: -- The `create_final_entities.name` field has been renamed to `create_final_entities.title` for consistency with the other tables. Use your parquet editor of choice to fix this. -- The `create_final_communities.id` field has been renamed to `create_final_communities.community` so that `id` can be repurposed for a UUID like the other tables. Use your parquet editor of choice to copy and rename this. You can copy it to leave the `id` field in place, or use a tool such as pandas to give each community a new UUID in the `id` field. (We join on the `community` field internally, so `id` can be effectively ignored). - -Embeddings changes: -- For Local Search, you need to have the entity.description embeddings in a vector store -- For DRIFT Search, you need the community.full_content embeddings in a vector store -- If you are only using Global search, you do not need any embeddings - -The easiest way to get both of those is to run the pipeline with all workflows skipped except for `generate_embeddings`, which will embed those fields and write them to a vector store directly. Using a newer config file that has the embeddings.vector_store block: - -- Set the `skip_workflows` value to [create_base_entity_graph, create_base_text_units, create_final_text_units, create_final_community_reports, create_final_nodes, create_final_relationships, create_final_documents, create_final_covariates, create_final_entities, create_final_communities] -- Re-run `graphrag index` - -What this does is run the pipeline, but skip over all of the usual artifact generation - the only workflow that is not skipped is the one that generates all default (or otherwise configured) embeddings. - ## Updated data model - We have streamlined the data model of the index in a few small ways to align tables more consistently and remove redundant content. Notably: