Skip to content

Commit

Permalink
refactor reindexing
Browse files Browse the repository at this point in the history
it creates a new write index with new mapping, then
reindexes the old index to the new one, and after that
deletes the old index.
  • Loading branch information
petrjasek committed Mar 8, 2024
1 parent 4efbb6b commit 4b7465a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 36 deletions.
111 changes: 76 additions & 35 deletions eve_elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import pytz # NOQA
import logging
import elasticsearch
import time

from bson import ObjectId
from elasticsearch.helpers import bulk, reindex
from elasticsearch.helpers import bulk, reindex # noqa

from uuid import uuid4
from flask import request, abort, json, current_app as app
from eve.utils import config
from eve.io.base import DataLayer
from eve.io.mongo.parser import parse, ParseError
from elasticsearch import Elasticsearch


logging.basicConfig()
Expand Down Expand Up @@ -332,14 +334,14 @@ def set_sort(query, sort):
query["sort"].append(sort_dict)


def get_es(url, **kwargs):
def get_es(url, **kwargs) -> Elasticsearch:
"""Create elasticsearch client instance.
:param url: elasticsearch url
"""
urls = [url] if isinstance(url, str) else url
kwargs.setdefault("serializer", ElasticJSONSerializer())
es = elasticsearch.Elasticsearch(urls, **kwargs)
es = Elasticsearch(urls, **kwargs)
return es


Expand Down Expand Up @@ -989,7 +991,7 @@ def _resource_config(self, resource, key, default=None):
px = self._resource_prefix(resource)
return app.config.get("%s_%s" % (px, key), default)

def elastic(self, resource):
def elastic(self, resource) -> Elasticsearch:
"""Get ElasticSearch instance for given resource."""
px = self._resource_prefix(resource)

Expand Down Expand Up @@ -1041,46 +1043,85 @@ def search(self, query, resources, params=None):
except elasticsearch.exceptions.RequestError:
raise

def reindex(self, resource):
def reindex(self, resource, requests_per_second=100):
es = self.elastic(resource)
alias = self._resource_index(resource)
settings = self._resource_config(resource, "SETTINGS")
mapping = self._resource_mapping(resource)
new_index = False
try:
old_index = self.get_index(resource)
except elasticsearch.exceptions.NotFoundError:
new_index = True

old_index = None
indexes = es.indices.get_alias(index=alias)
for index, aliases in indexes.items():
old_index = index
specs = aliases["aliases"][alias]
if specs and specs["is_write_index"]:
break

if old_index:
print("OLD INDEX", old_index)

# create new index
index = generate_index_name(alias)
print("create", index)
self._create_index(es, index, settings)
self._put_mapping(es, index, mapping)

if not new_index:
# reindex data
print("reindex", alias, index)
reindex(es, alias, index)

# remove old alias
print("remove alias", alias, old_index)
try:
es.indices.delete_alias(index=old_index, name=alias)
except elasticsearch.exceptions.NotFoundError:
# this was not an alias, but an index. will be removed in next step
pass
new_index = generate_index_name(alias)
self._create_index(es, new_index, settings)
self._put_mapping(es, new_index, mapping)

# remove old index
print("remove index", old_index)
es.indices.delete(old_index)
print("NEW INDEX", new_index)

if not old_index:
es.indices.put_alias(index=new_index, name=alias)
print("There is no index to reindex from, done.")
return

# create alias for new index
print("put", alias, index)
es.indices.put_alias(index=index, name=alias)
# add the new index as writable
es.indices.update_aliases(
body={
"actions": [
{
"add": { # add new index as write index
"index": new_index,
"alias": alias,
"is_write_index": True,
},
},
{
"add": { # make sure the old index is not write index
"index": old_index,
"alias": alias,
"is_write_index": False,
},
},
],
}
)

# reindex data from old to new index
resp = es.reindex(
body={
"source": {"index": old_index},
"dest": {"index": new_index, "version_type": "external"},
},
requests_per_second=requests_per_second,
wait_for_completion=False,
refresh=True,
)
task_id = resp["task"]
print(f"REINDEXING task {task_id}")

while True:
time.sleep(2.0)
task_info = es.tasks.get(task_id=task_id)
if task_info["completed"]:
total = task_info["response"]["total"]
took = int(task_info["response"]["took"] / 1000) # ms to s
print()
print(f"DONE reindexing {total} items, took {took}s")
break
else:
print(".", end="")

print("refresh", index)
es.indices.refresh(index)
# remove old index
print("REMOVE OLD INDEX", old_index)
es.indices.delete(index=old_index)


def build_elastic_query(doc):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"arrow>=0.4.2",
"ciso8601>=1.0.2,<2",
"pytz>=2015.4",
"elasticsearch>=7.0,<7.14",
"elasticsearch>=7.0,<8.0",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand Down

0 comments on commit 4b7465a

Please sign in to comment.