Skip to content

Commit

Permalink
Build: refactor search indexing process
Browse files Browse the repository at this point in the history
Currently, we walk the entire project directory to
apply two operations: index files in ES, and keep track of index/404 files.
These two operations are independent, but in our code they are kind of
mixed together in order to avoid walking the project directory twice.

I have abstracted the processing of the files with a "Indexer" class,
which is responsible for doing an operation on a file,
and at the end it can collect the results.
  • Loading branch information
stsewd committed Oct 2, 2024
1 parent ed7c06e commit a182899
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 132 deletions.
323 changes: 193 additions & 130 deletions readthedocs/projects/tasks/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,193 @@
log = structlog.get_logger(__name__)


class Indexer:

"""
Base class for doing operations over each file from a build.
The process method should be implemented to apply the operation
over each file, and the collect method should be implemented
to collect the results of the operation after processing all files.
`sync_id` is used to differentiate the files from the current sync from the previous one.
"""

def process(self, html_file: HTMLFile, sync_id: int):
raise NotImplementedError

def collect(self, sync_id: int):
raise NotImplementedError


class SearchIndexer(Indexer):

"""
Index HTML files in ElasticSearch.
We respect the search ranking and ignore patterns defined in the project's search configuration.
If search_index_name is provided, it will be used as the search index name,
otherwise the default one will be used.
"""

def __init__(
self,
project: Project,
version: Version,
search_ranking: dict[str, int],
search_ignore: list[str],
search_index_name: str | None = None,
):
self.project = project
self.version = version
self.search_ranking = search_ranking
self.search_ignore = search_ignore
self._reversed_search_ranking = list(reversed(search_ranking.items()))
self.search_index_name = search_index_name
self._html_files_to_index = []

def process(self, html_file: HTMLFile, sync_id: int):
for pattern in self.search_ignore:
if fnmatch(html_file.path, pattern):
return

for pattern, rank in self._reversed_search_ranking:
if fnmatch(html_file.path, pattern):
html_file.rank = rank
break

self._html_files_to_index.append(html_file)

def collect(self, sync_id: int):
# Index new files in ElasticSearch.
if self._html_files_to_index:
index_objects(
document=PageDocument,
objects=self._html_files_to_index,
index_name=self.search_index_name,
# Pages are indexed in small chunks to avoid a
# large payload that will probably timeout ES.
chunk_size=100,
)

# Remove old HTMLFiles from ElasticSearch.
remove_indexed_files(
project_slug=self.project.slug,
version_slug=self.version.slug,
sync_id=sync_id,
index_name=self.search_index_name,
)


class IndexFileIndexer(Indexer):

"""
Create imported files of interest in the DB.
We only save the top-level 404 file and index files,
we don't need to keep track of all files.
These files are queried by proxito instead of checking S3 (slow).
"""

def __init__(self, project: Project, version: Version):
self.project = project
self.version = version
self._html_files_to_save = []

def process(self, html_file: HTMLFile, sync_id: int):
if html_file.path == "404.html" or html_file.name == "index.html":
self._html_files_to_save.append(html_file)

def collect(self, sync_id: int):
if self._html_files_to_save:
HTMLFile.objects.bulk_create(self._html_files_to_save)

# Delete imported files from the previous build of the version.
self.version.imported_files.exclude(build=sync_id).delete()


def _get_indexers(*, version, search_ranking, search_ignore, search_index_name=None):
indexers = []
# NOTE: The search indexer must be before the index file indexer.
# This is because saving the objects in the DB will give them an id,
# and we neeed this id to be `None` when indexing the objects in ES.
# ES will generate a unique id for each document.
# NOTE: If the version is external, we don't create a search index for it.
if not version.is_external:
search_indexer = SearchIndexer(
project=version.project,
version=version,
search_ranking=search_ranking,
search_ignore=search_ignore,
search_index_name=search_index_name,
)
indexers.append(search_indexer)
index_file_indexer = IndexFileIndexer(
project=version.project,
version=version,
)
indexers.append(index_file_indexer)
return indexers


def _process_files(*, version: Version, indexers: list[Indexer]):
storage_path = version.project.get_storage_path(
type_="html",
version_slug=version.slug,
include_file=False,
version_type=version.type,
)
# A sync ID is a number different than the current `build` attribute (pending rename),
# it's used to differentiate the files from the current sync from the previous one.
# This is useful to easily delete the previous files from the DB and ES.
# See https://github.com/readthedocs/readthedocs.org/issues/10734.
imported_file_build_id = version.imported_files.values_list(
"build", flat=True
).first()
sync_id = imported_file_build_id + 1 if imported_file_build_id else 1

log.debug(
"Using sync ID for search indexing",
sync_id=sync_id,
)

for root, __, filenames in build_media_storage.walk(storage_path):
for filename in filenames:
# We don't care about non-HTML files (for now?).
if not filename.endswith(".html"):
continue

full_path = build_media_storage.join(root, filename)
# Generate a relative path for storage similar to os.path.relpath
relpath = full_path.removeprefix(storage_path).lstrip("/")

html_file = HTMLFile(
project=version.project,
version=version,
path=relpath,
name=filename,
# TODO: We are setting the commit field since it's required,
# but it isn't used, and will be removed in the future
# together with other fields.
commit="unknown",
build=sync_id,
)
for indexer in indexers:
indexer.process(html_file, sync_id)

for indexer in indexers:
indexer.collect(sync_id)

# This signal is used for purging the CDN.
files_changed.send(
sender=Project,
project=version.project,
version=version,
)
return sync_id


@app.task(queue="reindex")
def index_build(build_id):
"""Create imported files and search index for the build."""
Expand Down Expand Up @@ -49,13 +236,14 @@ def index_build(build_id):
search_ignore = search_config.get("ignore", [])

try:
_create_imported_files_and_search_index(
indexers = _get_indexers(
version=version,
search_ranking=search_ranking,
search_ignore=search_ignore,
)
_process_files(version=version, indexers=indexers)
except Exception:
log.exception("Failed during creation of new files")
log.exception("Failed to index build")


@app.task(queue="reindex")
Expand Down Expand Up @@ -99,14 +287,15 @@ def reindex_version(version_id, search_index_name=None):
search_ignore = search_config.get("ignore", [])

try:
_create_imported_files_and_search_index(
indexers = _get_indexers(
version=version,
search_ranking=search_ranking,
search_ignore=search_ignore,
search_index_name=search_index_name,
)
_process_files(version=version, indexers=indexers)
except Exception:
log.exception("Failed during creation of new files")
log.exception("Failed to re-index version")


@app.task(queue="reindex")
Expand Down Expand Up @@ -141,129 +330,3 @@ def remove_search_indexes(project_slug, version_slug=None):
project_slug=project_slug,
version_slug=version_slug,
)


def _create_imported_files_and_search_index(
*, version, search_ranking, search_ignore, search_index_name=None
):
"""
Create imported files and search index for the build of the version.
If the version is external, we don't create a search index for it, only imported files.
After the process is completed, we delete the files and search index that
don't belong to the current build id.
:param search_index: If provided, it will be used as the search index name,
otherwise the default one will be used.
"""
storage_path = version.project.get_storage_path(
type_="html",
version_slug=version.slug,
include_file=False,
version_type=version.type,
)
# A sync ID is a number different than the current `build` attribute (pending rename),
# it's used to differentiate the files from the current sync from the previous one.
# This is useful to easily delete the previous files from the DB and ES.
# See https://github.com/readthedocs/readthedocs.org/issues/10734.
imported_file_build_id = version.imported_files.values_list(
"build", flat=True
).first()
sync_id = imported_file_build_id + 1 if imported_file_build_id else 1

log.debug(
"Using sync ID for search indexing",
sync_id=sync_id,
)

html_files_to_index = []
html_files_to_save = []
reverse_rankings = list(reversed(search_ranking.items()))
for root, __, filenames in build_media_storage.walk(storage_path):
for filename in filenames:
# We don't care about non-HTML files
if not filename.endswith(".html"):
continue

full_path = build_media_storage.join(root, filename)

# Generate a relative path for storage similar to os.path.relpath
relpath = full_path.replace(storage_path, "", 1).lstrip("/")

skip_search_index = False
if version.is_external:
# Never index files from external versions.
skip_search_index = True
else:
for pattern in search_ignore:
if fnmatch(relpath, pattern):
skip_search_index = True
break

page_rank = 0
# If the file is ignored, we don't need to check for its ranking.
if not skip_search_index:
# Last pattern to match takes precedence
for pattern, rank in reverse_rankings:
if fnmatch(relpath, pattern):
page_rank = rank
break

html_file = HTMLFile(
project=version.project,
version=version,
path=relpath,
name=filename,
rank=page_rank,
# TODO: We are setting the commit field since it's required,
# but it isn't used, and will be removed in the future
# together with other fields.
commit="unknown",
build=sync_id,
ignore=skip_search_index,
)

if not skip_search_index:
html_files_to_index.append(html_file)

# Create the imported file only if it's a top-level 404 file,
# or if it's an index file. We don't need to keep track of all files.
tryfiles = ["index.html"]
if relpath == "404.html" or filename in tryfiles:
html_files_to_save.append(html_file)

# We first index the files in ES, and then save the objects in the DB.
# This is because saving the objects in the DB will give them an id,
# and we neeed this id to be `None` when indexing the objects in ES.
# ES will generate a unique id for each document.
if html_files_to_index:
index_objects(
document=PageDocument,
objects=html_files_to_index,
index_name=search_index_name,
# Pages are indexed in small chunks to avoid a
# large payload that will probably timeout ES.
chunk_size=100,
)

# Remove old HTMLFiles from ElasticSearch
remove_indexed_files(
project_slug=version.project.slug,
version_slug=version.slug,
sync_id=sync_id,
index_name=search_index_name,
)

if html_files_to_save:
HTMLFile.objects.bulk_create(html_files_to_save)

# Delete imported files from the previous build of the version.
version.imported_files.exclude(build=sync_id).delete()

# This signal is used for purging the CDN.
files_changed.send(
sender=Project,
project=version.project,
version=version,
)
return sync_id
5 changes: 3 additions & 2 deletions readthedocs/rtd_tests/tests/test_imported_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from readthedocs.builds.constants import EXTERNAL
from readthedocs.projects.models import HTMLFile, ImportedFile, Project
from readthedocs.projects.tasks.search import _create_imported_files_and_search_index
from readthedocs.projects.tasks.search import _get_indexers, _process_files
from readthedocs.search.documents import PageDocument

base_dir = os.path.dirname(os.path.dirname(__file__))
Expand Down Expand Up @@ -46,11 +46,12 @@ def _manage_imported_files(self, version, search_ranking=None, search_ignore=Non
"""Helper function for the tests to create and sync ImportedFiles."""
search_ranking = search_ranking or {}
search_ignore = search_ignore or []
return _create_imported_files_and_search_index(
indexers = _get_indexers(
version=version,
search_ranking=search_ranking,
search_ignore=search_ignore,
)
return _process_files(version=version, indexers=indexers)

def _copy_storage_dir(self):
"""Copy the test directory (rtd_tests/files) to storage"""
Expand Down

0 comments on commit a182899

Please sign in to comment.