Skip to content

Commit

Permalink
Adding project created for content about Building a recipe search wit…
Browse files Browse the repository at this point in the history
…h Elasticsearch (#350)

* This PR contains scripts used for the article Building a recipe search with Elasticsearch

* fixing code

* fixing reformatted files
  • Loading branch information
andreluiz1987 authored Nov 7, 2024
1 parent 0c4d06f commit 4f92c8f
Show file tree
Hide file tree
Showing 9 changed files with 12,663 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Building a Recipe Search with Elasticsearch

This project demonstrates how to implement a semantic search using Elastic's
ELSER and compare its results with a traditional lexical search. The setup is made practical and efficient by using a cluster created in Elastic Cloud, simplifying the use of ELSER and accelerating development.

> **Tip:** To learn more about Elastic Cloud and how to use it, visit: [https://www.elastic.co/pt/cloud](https://www.elastic.co/pt/cloud)
## Project Objectives

1. **Configure Elasticsearch infrastructure** to support semantic and lexical search indexes.
2. **Data ingestion**: Use Python scripts to populate indexes with grocery product data.
3. **Compare search types**: Perform searches and display the results for comparison.

## Prerequisites

- **Elasticsearch v8.15** (recommended): To support ELSER.
- **Python 3.x**: Required to run the ingestion and search scripts.
- **Python Libraries**: Required libraries are listed in the `requirements.txt` file.

To install the dependencies, use the following command:

```bash
pip install -r requirements.txt
```

## Creating the Indexes
To create the semantic and lexical search indexes, run the following scripts:

### Semantic Index

```bash
python infra.py
```

### Lexical Index
```bash
python infra_lexical_index.py
```

These scripts will automatically configure the indexes in Elasticsearch.

## Data Ingestion
To ingest the recipe data into the indexes, use the commands below:

### Ingest Data into the Semantic Index

```bash
python ingestion.py
```

### Ingest Data into the Lexical Index
```bash
python ingestion_lexical_index.py
```

## Search
To perform searches and obtain results from both the semantic and lexical searches,
run the following command:

```bash
python search.py
```

This script performs searches in both indexes and displays the results in the console,
making it easy to compare the two approaches.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import yaml
from elasticsearch import Elasticsearch, AsyncElasticsearch


class ElasticsearchConnection:

def __init__(self, config_file="config.yml"):
with open(config_file, "r") as f:
config = yaml.safe_load(f)
self.client = Elasticsearch(
cloud_id=config["cloud_id"], api_key=config["api_key"]
)

def get_client(self):
return self.client

def get_async_client(self):
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
self.client = AsyncElasticsearch(
cloud_id=config["cloud_id"],
api_key=config["api_key"],
request_timeout=240,
)
return self.client

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from elasticsearch_connection import ElasticsearchConnection

client = ElasticsearchConnection().get_client()


def create_index_embedding():
response = client.indices.create(
index="grocery-catalog-elser",
mappings={
"properties": {
"id": {"type": "integer"},
"name": {"type": "text"},
"description": {"type": "text", "copy_to": "description_embedding"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"price": {"type": "float"},
"unit": {"type": "keyword"},
"description_embedding": {
"type": "semantic_text",
"inference_id": "elser_embeddings",
},
}
},
)
print(response)


def create_inference():
response = client.inference.put(
inference_id="elser_embeddings",
task_type="sparse_embedding",
body={
"service": "elser",
"service_settings": {"num_allocations": 1, "num_threads": 1},
},
)
print(response)


if __name__ == "__main__":

create_inference()

create_index_embedding()
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from elasticsearch_connection import ElasticsearchConnection

client = ElasticsearchConnection().get_client()


def create_index():
response = client.indices.create(
index="grocery-catalog",
mappings={
"properties": {
"id": {"type": "integer"},
"name": {"type": "text"},
"description": {"type": "text", "copy_to": "description_embedding"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"price": {"type": "float"},
"unit": {"type": "keyword"},
}
},
)
print(response)


if __name__ == "__main__":
create_index()
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import json

from elasticsearch import helpers

from elasticsearch_connection import ElasticsearchConnection

async_client = ElasticsearchConnection().get_async_client()


def partition_list(lst, chunk_size):
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]


async def index_data():
global partitions
with open("files/output.json", "r") as file:
data_json = json.load(file)
documents = []
for doc in data_json:
documents.append(
{
"_index": "grocery-catalog-elser",
"_source": doc,
}
)

partitions = partition_list(documents, 500)

for i, partition in enumerate(partitions):
print(f"partition {i + 1}")
await async_bulk_indexing(async_client, partition)


async def async_bulk_indexing(client, documents):
success, failed = await helpers.async_bulk(client, documents)
print(
f"Successfully indexed {success} documents. Failed to index {failed} documents."
)


async def main():
await index_data()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio
import json

from elasticsearch import helpers

from elasticsearch_connection import ElasticsearchConnection

async_client = ElasticsearchConnection().get_async_client()


def partition_list(lst, chunk_size):
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]


async def index_data():
global partitions
with open("files/output.json", "r") as file:
data_json = json.load(file)
documents = []
for doc in data_json:
documents.append(
{
"_index": "grocery-catalog",
"_source": doc,
}
)

partitions = partition_list(documents, 500)

for i, partition in enumerate(partitions):
print(f"partition {i + 1}")
await async_bulk_indexing(async_client, partition)


async def async_bulk_indexing(client, documents):
success, failed = await helpers.async_bulk(client, documents)
print(
f"Successfully indexed {success} documents. Failed to index {failed} documents."
)


async def main():
await index_data()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
elasticsearch
aiohttp
pyyaml
pandas
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import pandas as pd

from elasticsearch_connection import ElasticsearchConnection

es_client = ElasticsearchConnection().get_client()

term = "seafood for grilling"
size = 5


def format_text(description, line_length=120):
words = description.split()
if len(words) <= line_length:
return description
else:
return " ".join(words[:line_length]) + "..."


def search_semantic(term):
result = []
response = es_client.search(
index="grocery-catalog-elser",
size=size,
source_excludes="description_embedding",
query={"semantic": {"field": "description_embedding", "query": term}},
)

for hit in response["hits"]["hits"]:
score = hit["_score"]
name = format_text(hit["_source"]["name"], line_length=10)
description = hit["_source"]["description"]
formatted_description = format_text(description)
result.append(
{
"score": score,
"name": name,
"description": formatted_description,
}
)
return result


def search_lexical(term):
result = []
response = es_client.search(
index="grocery-catalog-elser",
size=size,
source_excludes="description_embedding",
query={"multi_match": {"query": term, "fields": ["name", "description"]}},
)

for hit in response["hits"]["hits"]:
score = hit["_score"]
name = format_text(hit["_source"]["name"], line_length=10)
description = hit["_source"]["description"]
result.append(
{
"score": score,
"name": name,
"description": description,
}
)
return result


if __name__ == "__main__":
rs1 = search_semantic(term)
rs2 = search_lexical(term)

df1 = (
pd.DataFrame(rs1)[["name", "score"]]
if rs1
else pd.DataFrame(columns=["name", "score"])
)
df2 = (
pd.DataFrame(rs2)[["name", "score"]]
if rs2
else pd.DataFrame(columns=["name", "score"])
)
df1 = (
pd.DataFrame(rs1)[["name", "score"]]
if rs1
else pd.DataFrame(columns=["name", "score"])
)
df1["Search Type"] = "Semantic"

df2 = (
pd.DataFrame(rs2)[["name", "score"]]
if rs2
else pd.DataFrame(columns(["name", "score"]))
)
df2["Search Type"] = "Lexical"

tabela = pd.concat([df1, df2], axis=0).reset_index(drop=True)

tabela = tabela[["Search Type", "name", "score"]]

tabela.columns = ["Search Type", "Name", "Score"]

tabela["Search Type"] = tabela["Search Type"].astype(str).str.ljust(0)
tabela["Name"] = tabela["Name"].astype(str).str.ljust(15)
tabela["Score"] = tabela["Score"].astype(str).str.ljust(5)

print(tabela.to_string(index=False))

0 comments on commit 4f92c8f

Please sign in to comment.