Skip to content

Commit

Permalink
General improvements to the iterative evaluation notebook (#40)
Browse files Browse the repository at this point in the history
Feel free to review this PR for improvements to add to our backlog, but
merge as is, as I'll be using this for a demo Thursday evening.

This PR contains a couple of improvements to the evaluation notebook and
pipelines:
- Update for release 0.8.0
- Include feedback from Caroline and myself
- Update README to point to different notebooks

Especially `utils.py` can probably use some cleanup after the demo :)
RobbeSneyders authored Dec 14, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 68b36a5 commit 14f0af0
Showing 10 changed files with 660 additions and 788 deletions.
67 changes: 16 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# RAG (Retrieval Augmented Generation) Ingestion Pipeline
# RAG (Retrieval Augmented Generation) Ingestion Pipeline Tuning

<p align="center">
<a href="https://github.com/ml6team/fondant">
@@ -10,69 +10,34 @@

## Introduction

Retrieval Augmented Generation (RAG) harnesses the capabilities of large language models (LLM),
combining
information retrieval with text generation. While LLMs are excellent at generating
grammatically correct text, they can sometimes produce inaccurate information, also known as
hallucination. RAG systems address this issue by retrieving data from databases, incorporating the
retrieved information, and instructing the model to answer questions using only the provided data.
This approach enables LLMs to deliver reliable and current responses without requiring
retraining, making them adaptable to evolving facts.
This repository contains RAG-related pipelines built with
[Fondant](https://github.com/ml6team/fondant), a hub and framework for easy and shareable data
processing.

<p align="center">
<img src="art/rag_architecture.png" height="450px"/>
</p>
<p align="center">
RAG System Architecture
</p>

More information on RAG system you can find on the following
[blog post](https://blog.ml6.eu/leveraging-llms-on-your-domain-specific-knowledge-base-4441c8837b47)

An essential building block of the RAG system is the smart retriever. To build a custom RAG system
we have to make custom data searchable. Typically, semantic search is employed for this task.
Documents are transformed into embeddings, which are subsequently stored in a vector database.
## Available notebooks

## Pipeline Overview
### A simple RAG indexing pipeline

The image below shows the entire pipeline and its workflow.
Note, for demo purpose we are using a dataset which is available on Huggingface. If you want to use
your own data, you have to adapt the initial LoadComponent. You could also customise the final
writing component.
If you want to store the embeddings in a different database, you have to implement your own
WriteComponent.
A [**notebook**](./src/pipeline.ipynb) with a simple Fondant pipeline to index your data into a
RAG system.

### Iterative tuning of a RAG indexing pipeline

<p align="center">
<img src="art/pipeline.png"/>
</p>
<p align="center">
Pipeline Overview
</p>

A [**notebook**](./src/evaluation.ipynb) which iteratively runs a Fondant
[indexing pipeline](./src/pipeline_index.py) and [evaluation pipeline](./src/pipeline_eval.py) with
different parameters for comparison. You can inspect the data between every step to make
informed choices on which parameters to try.

There are 4 components in total, these are:
### Auto-tuning of a RAG indexing pipeline

- [Load from Huggingface Hub](https://github.com/ml6team/fondant/tree/main/components/load_from_hf_hub):
The pipeline begins by loading text data from a Parquet file, which serves as the
source for subsequent processing. For the minimal example we are using a dataset from Huggingface.
- [Text Chunking](https://github.com/ml6team/fondant/tree/main/components/chunk_text): Text data is
chunked into manageable sections to prepare it for embedding. This
step
is crucial for performant RAG systems.
- [Text Embedding](https://github.com/ml6team/fondant/tree/main/components/embed_text): We are using
a small HuggingFace model for the generation of text embeddings.
The `embed_text` component easily allows the usage of different models as well.
- [Write to Weaviate](https://github.com/ml6team/fondant/tree/main/components/index_weaviate): The
final step of the pipeline involves writing the embedded text data to
a Weaviate database.
> 🚧 Coming soon
## Getting started

> ⚠️ **Prerequisites:**
>
> - A Python version between 3.8 and 3.10 installed on your system.
> - Docker installed and configured on your system.
> - Docker and docker compose installed and configured on your system.
> - A GPU is recommended to run the model-based components of the pipeline.
### Cloning the repository
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fondant==0.8.dev6
fondant==0.8.0
notebook==7.0.6
102 changes: 0 additions & 102 deletions src/README.md

This file was deleted.

7 changes: 3 additions & 4 deletions src/components/aggregate_eval_results/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -4,14 +4,13 @@ description: Component that aggregates results of the evaluation of the retrieve
image: ghcr.io/ml6team/aggregate_eval_results:dev

consumes: #TODO: add/retrieve metrics to consider
context_precision:
type: float32
context_relevancy:
type: float32
additionalProperties: true


produces:
metric:
type: string
score:
type: float32

previous_index: ""
11 changes: 5 additions & 6 deletions src/components/aggregate_eval_results/src/main.py
Original file line number Diff line number Diff line change
@@ -3,15 +3,14 @@


class AggregateResults(DaskTransformComponent):
def __init__(self, **kwargs) -> None:
return None
def __init__(self, consumes: dict, **kwargs):
self.consumes = consumes

def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
metrics = list(dataframe.select_dtypes(["float", "int"]).columns)
metrics = list(self.consumes.keys())
agg = dataframe[metrics].mean()
agg_df = agg.to_frame(name="score")
agg_df["metric"] = agg.index
agg_results_df = agg_df[["metric", "score"]]
agg_results_df = agg_results_df.reset_index(drop=True)
agg_df.index = agg_df.index.astype(str)

return agg_results_df
return agg_df
553 changes: 0 additions & 553 deletions src/eval_iter_loop.ipynb

This file was deleted.

534 changes: 534 additions & 0 deletions src/evaluation.ipynb

Large diffs are not rendered by default.

54 changes: 29 additions & 25 deletions src/pipeline_eval.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
"""Fondant pipeline to evaluate a RAG pipeline."""

import pyarrow as pa
from fondant.pipeline import Pipeline


def create_pipeline( # noqa: PLR0913
pipeline_dir: str = "./data-dir",
def create_pipeline(
*,
base_path: str = "./data-dir",
weaviate_url="http://host.docker.internal:8080",
weaviate_class: str = "Pipeline1",
csv_dataset_uri: str = "/data/wikitext_1000_q.csv",
csv_separator: str = ";",
embed_model_provider: str = "huggingface",
embed_model: str = "all-MiniLM-L6-v2",
embed_api_key: dict = {},
weaviate_url="http://host.docker.internal:8080",
weaviate_class_name: str = "Pipeline1",
# evaluation args
csv_dataset_uri: str = "/data/wikitext_1000_q.csv",
csv_column_separator: str = ";",
question_column_name: str = "question",
top_k: int = 3,
module: str = "langchain.llms",
llm_name: str = "OpenAI",
llm_kwargs: dict = {"openai_api_key": ""}, # TODO if use Fondant CLI
metrics: list = ["context_precision", "context_relevancy"],
retrieval_top_k: int = 3,
evaluation_module: str = "langchain.llms",
evaluation_llm: str = "OpenAI",
evaluation_llm_kwargs: dict = {},
evaluation_metrics: list = ["context_precision", "context_relevancy"],
):
"""Create a Fondant pipeline based on the provided arguments."""
evaluation_pipeline = Pipeline(
name="evaluation-pipeline",
description="Pipeline to evaluate \
a RAG solution",
base_path=pipeline_dir,
description="Pipeline to evaluate a RAG solution",
base_path=base_path,
)

load_from_csv = evaluation_pipeline.read(
"load_from_csv",
arguments={
"dataset_uri": csv_dataset_uri,
"column_separator": csv_column_separator,
"column_name_mapping": {question_column_name: "text"},
"column_separator": csv_separator,
},
produces={
"text": pa.string(),
"question": pa.string(),
},
)

@@ -46,29 +45,34 @@ def create_pipeline( # noqa: PLR0913
"model": embed_model,
"api_keys": embed_api_key,
},
consumes={
"text": "question",
},
)

retrieve_chunks = embed_text_op.apply(
"retrieve_from_weaviate",
arguments={
"weaviate_url": weaviate_url,
"class_name": weaviate_class_name,
"top_k": top_k,
"class_name": weaviate_class,
"top_k": retrieval_top_k,
},
cache=False,
)

retriever_eval = retrieve_chunks.apply(
"evaluate_ragas",
arguments={
"module": module,
"llm_name": llm_name,
"llm_kwargs": llm_kwargs,
"module": evaluation_module,
"llm_name": evaluation_llm,
"llm_kwargs": evaluation_llm_kwargs,
},
produces={metric: pa.float32() for metric in metrics},
produces={metric: pa.float32() for metric in evaluation_metrics},
)

retriever_eval.apply(
"components/aggregate_eval_results",
consumes={metric: pa.float32() for metric in evaluation_metrics},
)

return evaluation_pipeline
27 changes: 13 additions & 14 deletions src/pipeline_index.py
Original file line number Diff line number Diff line change
@@ -3,33 +3,31 @@
from fondant.pipeline import Pipeline


def create_pipeline( # noqa: PLR0913
pipeline_dir: str = "./data-dir",
def create_pipeline(
*,
base_path: str = "s3://sagemaker-fondant-artifacts-robbe/data",
n_rows_to_load: int = 1000,
weaviate_url: str = "http://host.docker.internal:8080",
weaviate_class: str = "Pipeline1",
weaviate_overwrite: bool = True,
embed_model_provider: str = "huggingface",
embed_model: str = "all-MiniLM-L6-v2",
embed_api_key: dict = {},
weaviate_url: str = "http://host.docker.internal:8080",
weaviate_class_name: str = "Pipeline1",
overwrite: bool = True,
# indexing args
hf_dataset_name: str = "wikitext@~parquet",
data_column_name: str = "text",
n_rows_to_load: int = 1000,
chunk_size: int = 512,
chunk_overlap: int = 32,
):
"""Create a Fondant pipeline based on the provided arguments."""
indexing_pipeline = Pipeline(
name="indexing-pipeline",
description="Pipeline to prepare and process data for building a RAG solution",
base_path=pipeline_dir, # The demo pipelines uses a local directory to store the data.
base_path=base_path,
)

text = indexing_pipeline.read(
"load_from_hf_hub",
arguments={
# Add arguments
"dataset_name": hf_dataset_name,
"column_name_mapping": {data_column_name: "text"},
"dataset_name": "wikitext@~parquet",
"n_rows_to_load": n_rows_to_load,
},
produces={
@@ -58,9 +56,10 @@ def create_pipeline( # noqa: PLR0913
"index_weaviate",
arguments={
"weaviate_url": weaviate_url,
"class_name": weaviate_class_name,
"overwrite": overwrite,
"class_name": weaviate_class,
"overwrite": weaviate_overwrite,
},
cache=False,
)

return indexing_pipeline
91 changes: 59 additions & 32 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import glob
import json
import os
import socket
from datetime import datetime
from pathlib import Path

import pandas as pd

COMPONENT_NAME = "aggregate_eval_results"


def get_host_ip():
try:
@@ -28,37 +32,52 @@ def create_directory_if_not_exists(path):
return str(p_base_path)


# Store pipeline results
def store_results( # noqa: PLR0913
rag_results,
shared_args,
indexing_args,
evaluation_args,
index_pipeline_datetime,
eval_pipeline_datetime,
def store_results(pipeline_name, **kwargs):
base_path = kwargs.pop("base_path")

del kwargs["weaviate_url"]
del kwargs["embed_api_key"] # API key

run_dir = get_latest_run(base_path, pipeline_name)
param_file = os.path.join(run_dir, "params.json")
with open(param_file, "w") as f:
json.dump(kwargs, f)


def read_results(
pipeline_name,
base_path,
):
pipeline_dir = shared_args["pipeline_dir"]
pipeline_name = "evaluation-pipeline"
component_name = "aggregate_eval_results"
runs = get_runs(base_path, pipeline_name)
dfs = []
for run in runs:
component_path = os.path.join(base_path, pipeline_name, run, COMPONENT_NAME)
params_file = os.path.join(component_path, "params.json")
if not os.path.exists(params_file):
continue
with open(params_file) as f:
params = json.load(f)

# Read params
params_df = pd.DataFrame(params, index=[0]).reset_index(drop=True)

results_dict = {}
results_dict["shared_args"] = shared_args
results_dict["indexing_datetime"] = index_pipeline_datetime
results_dict["indexing_args"] = indexing_args
results_dict["evaluation_args"] = evaluation_args
results_dict["evaluation_datetime"] = eval_pipeline_datetime
results_dict["agg_metrics"] = read_latest_data(
base_path=pipeline_dir,
pipeline_name=pipeline_name,
component_name=component_name,
)
# Read metrics
parquet_path = glob.glob(os.path.join(component_path, "*.parquet"))
metrics_df = pd.read_parquet(parquet_path).reset_index(drop=True)
metrics_df = pd.DataFrame(
dict(zip(metrics_df["metric"], metrics_df["score"])),
index=[0],
)

rag_results.append(results_dict)
# Join params and metrics
results_df = params_df.join(metrics_df)

return rag_results
dfs.append(results_df)

return pd.concat(dfs).reset_index(drop=True)

def read_latest_data(base_path: str, pipeline_name: str, component_name: str):

def get_runs(base_path: str, pipeline_name: str):
# Specify the path to the 'data' directory
data_directory = f"{base_path}/{pipeline_name}"

@@ -74,19 +93,27 @@ def read_latest_data(base_path: str, pipeline_name: str, component_name: str):
entry for entry in subdirectories if entry.startswith(pipeline_name)
]
# keep pipeline folders containing a parquet file in the component folder
valid_entries = [

return [
folder
for folder in valid_entries
if has_parquet_file(data_directory, folder, component_name)
if has_parquet_file(data_directory, folder, COMPONENT_NAME)
]


def get_latest_run(base_path: str, pipeline_name: str):
runs = get_runs(base_path, pipeline_name)

# keep the latest folder
latest_folder = sorted(valid_entries, key=extract_timestamp, reverse=True)[0]
latest_run = sorted(runs, key=extract_timestamp, reverse=True)[0]
return os.path.join(base_path, pipeline_name, latest_run, COMPONENT_NAME)

# If a valid folder is found, proceed to read all Parquet files in the component folder
if latest_folder:
# Find the path to the component folder
component_folder = os.path.join(data_directory, latest_folder, component_name)

def read_latest_data(base_path: str, pipeline_name: str):
component_folder = get_latest_run(base_path, pipeline_name)

# If a valid folder is found, proceed to read all Parquet files in the component folder
if component_folder:
# Get a list of all Parquet files in the component folder
parquet_files = [
f for f in os.listdir(component_folder) if f.endswith(".parquet")

0 comments on commit 14f0af0

Please sign in to comment.