Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding project created for content about Building a recipe search with Elasticsearch #350

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Building a Recipe Search with Elasticsearch

This project demonstrates how to implement a semantic search using Elastic's
ELSER and compare its results with a traditional lexical search. The setup is made practical and efficient by using a cluster created in Elastic Cloud, simplifying the use of ELSER and accelerating development.

> **Tip:** To learn more about Elastic Cloud and how to use it, visit: [https://www.elastic.co/pt/cloud](https://www.elastic.co/pt/cloud)

## Project Objectives

1. **Configure Elasticsearch infrastructure** to support semantic and lexical search indexes.
2. **Data ingestion**: Use Python scripts to populate indexes with grocery product data.
3. **Compare search types**: Perform searches and display the results for comparison.

## Prerequisites

- **Elasticsearch v8.15** (recommended): To support ELSER.
- **Python 3.x**: Required to run the ingestion and search scripts.
- **Python Libraries**: Required libraries are listed in the `requirements.txt` file.

To install the dependencies, use the following command:

```bash
pip install -r requirements.txt
```

## Creating the Indexes
To create the semantic and lexical search indexes, run the following scripts:

### Semantic Index

```bash
python infra.py
```

### Lexical Index
```bash
python infra_lexical_index.py
```

These scripts will automatically configure the indexes in Elasticsearch.

## Data Ingestion
To ingest the recipe data into the indexes, use the commands below:

### Ingest Data into the Semantic Index

```bash
python ingestion.py
```

### Ingest Data into the Lexical Index
```bash
python ingestion_lexical_index.py
```

## Search
To perform searches and obtain results from both the semantic and lexical searches,
run the following command:

```bash
python search.py
```

This script performs searches in both indexes and displays the results in the console,
making it easy to compare the two approaches.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import yaml
from elasticsearch import Elasticsearch, AsyncElasticsearch


class ElasticsearchConnection:

def __init__(self, config_file="config.yml"):
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
self.client = Elasticsearch(
cloud_id=config['cloud_id'],
api_key=config['api_key']
)

def get_client(self):
return self.client

def get_async_client(self):
with open("config.yml", 'r') as f:
config = yaml.safe_load(f)
self.client = AsyncElasticsearch(
cloud_id=config['cloud_id'],
api_key=config['api_key'],
request_timeout=240)
return self.client;

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from elasticsearch_connection import ElasticsearchConnection

client = ElasticsearchConnection().get_client()


def create_index_embedding():
response = client.indices.create(
index="grocery-catalog-elser",
mappings={
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "text",
},
"description": {
"type": "text",
"copy_to": "description_embedding"
},
"category": {
"type": "keyword"
},
"brand": {
"type": "keyword"
},
"price": {
"type": "float"
},
"unit": {
"type": "keyword"
},
"description_embedding": {
"type": "semantic_text",
"inference_id": "elser_embeddings"
}
}
}
)
print(response)


def create_inference():
response = client.inference.put(
inference_id="elser_embeddings",
task_type="sparse_embedding", body={
"service": "elser",
"service_settings": {
"num_allocations": 1,
"num_threads": 1
}
})
print(response)


if __name__ == '__main__':

create_inference()

create_index_embedding()
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from elasticsearch_connection import ElasticsearchConnection

client = ElasticsearchConnection().get_client()


def create_index():
response = client.indices.create(
index="grocery-catalog",
mappings={
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "text",
},
"description": {
"type": "text"
},
"category": {
"type": "keyword"
},
"brand": {
"type": "keyword"
},
"price": {
"type": "float"
},
"unit": {
"type": "keyword"
}
}
}
)
print(response)


if __name__ == '__main__':
create_index()
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import json

from elasticsearch import helpers

from elasticsearch_connection import ElasticsearchConnection

async_client = ElasticsearchConnection().get_async_client()


def partition_list(lst, chunk_size):
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]


async def index_data():
global partitions
with open('files/output.json', 'r') as file:
data_json = json.load(file)
documents = []
for doc in data_json:
documents.append(
{
"_index": "grocery-catalog-elser",
"_source": doc,
}
)

partitions = partition_list(documents, 500)

for i, partition in enumerate(partitions):
print(f"partition {i + 1}")
await async_bulk_indexing(async_client, partition)


async def async_bulk_indexing(client, documents):
success, failed = await helpers.async_bulk(client, documents)
print(f"Successfully indexed {success} documents. Failed to index {failed} documents.")


async def main():
await index_data()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import json

from elasticsearch import helpers

from elasticsearch_connection import ElasticsearchConnection

async_client = ElasticsearchConnection().get_async_client()


def partition_list(lst, chunk_size):
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]


async def index_data():
global partitions
with open('files/output.json', 'r') as file:
data_json = json.load(file)
documents = []
for doc in data_json:
documents.append(
{
"_index": "grocery-catalog",
"_source": doc,
}
)

partitions = partition_list(documents, 500)

for i, partition in enumerate(partitions):
print(f"partition {i + 1}")
await async_bulk_indexing(async_client, partition)


async def async_bulk_indexing(client, documents):
success, failed = await helpers.async_bulk(client, documents)
print(f"Successfully indexed {success} documents. Failed to index {failed} documents.")


async def main():
await index_data()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
elasticsearch
aiohttp
pyyaml
pandas
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import pandas as pd

from elasticsearch_connection import ElasticsearchConnection

es_client = ElasticsearchConnection().get_client()

term = "seafood for grilling"
size = 5


def format_text(description, line_length=120):
words = description.split()
if len(words) <= line_length:
return description
else:
return ' '.join(words[:line_length]) + '...'


def search_semantic(term):
result = []
response = es_client.search(
index="grocery-catalog-elser",
size=size,
source_excludes="description_embedding",
query={
"semantic": {
"field": "description_embedding",
"query": term

}
})

for hit in response["hits"]["hits"]:
score = hit["_score"]
name = format_text(hit["_source"]["name"], line_length=10)
description = hit["_source"]["description"]
formatted_description = format_text(description)
result.append({
'score': score,
'name': name,
'description': formatted_description,
})
return result


def search_lexical(term):
result = []
response = es_client.search(
index="grocery-catalog-elser",
size=size,
source_excludes="description_embedding",
query={
"multi_match": {
"query": term,
"fields": [
"name",
"description"]
}
}
)

for hit in response["hits"]["hits"]:
score = hit["_score"]
name = format_text(hit["_source"]["name"], line_length=10)
description = hit["_source"]["description"]
result.append({
'score': score,
'name': name,
'description': description,
})
return result


if __name__ == '__main__':
rs1 = search_semantic(term)
rs2 = search_lexical(term)

df1 = pd.DataFrame(rs1)[['name', 'score']] if rs1 else pd.DataFrame(columns=['name', 'score'])
df2 = pd.DataFrame(rs2)[['name', 'score']] if rs2 else pd.DataFrame(columns=['name', 'score'])
df1 = pd.DataFrame(rs1)[['name', 'score']] if rs1 else pd.DataFrame(columns=['name', 'score'])
df1['Search Type'] = 'Semantic'

df2 = pd.DataFrame(rs2)[['name', 'score']] if rs2 else pd.DataFrame(columns(['name', 'score']))
df2['Search Type'] = 'Lexical'

tabela = pd.concat([df1, df2], axis=0).reset_index(drop=True)

tabela = tabela[['Search Type', 'name', 'score']]

tabela.columns = ['Search Type', 'Name', 'Score']

tabela['Search Type'] = tabela['Search Type'].astype(str).str.ljust(0)
tabela['Name'] = tabela['Name'].astype(str).str.ljust(15)
tabela['Score'] = tabela['Score'].astype(str).str.ljust(5)

print(tabela.to_string(index=False))
Loading