From 4b7465ad4af3ee1c18767d9feda325a7b3207ee2 Mon Sep 17 00:00:00 2001 From: Petr Jasek Date: Fri, 8 Mar 2024 11:47:58 +0100 Subject: [PATCH] refactor reindexing it creates a new write index with new mapping, then reindexes the old index to the new one, and after that deletes the old index. --- eve_elastic/elastic.py | 111 ++++++++++++++++++++++++++++------------- setup.py | 2 +- 2 files changed, 77 insertions(+), 36 deletions(-) diff --git a/eve_elastic/elastic.py b/eve_elastic/elastic.py index 6ff298b..4d64afe 100644 --- a/eve_elastic/elastic.py +++ b/eve_elastic/elastic.py @@ -5,15 +5,17 @@ import pytz # NOQA import logging import elasticsearch +import time from bson import ObjectId -from elasticsearch.helpers import bulk, reindex +from elasticsearch.helpers import bulk, reindex # noqa from uuid import uuid4 from flask import request, abort, json, current_app as app from eve.utils import config from eve.io.base import DataLayer from eve.io.mongo.parser import parse, ParseError +from elasticsearch import Elasticsearch logging.basicConfig() @@ -332,14 +334,14 @@ def set_sort(query, sort): query["sort"].append(sort_dict) -def get_es(url, **kwargs): +def get_es(url, **kwargs) -> Elasticsearch: """Create elasticsearch client instance. :param url: elasticsearch url """ urls = [url] if isinstance(url, str) else url kwargs.setdefault("serializer", ElasticJSONSerializer()) - es = elasticsearch.Elasticsearch(urls, **kwargs) + es = Elasticsearch(urls, **kwargs) return es @@ -989,7 +991,7 @@ def _resource_config(self, resource, key, default=None): px = self._resource_prefix(resource) return app.config.get("%s_%s" % (px, key), default) - def elastic(self, resource): + def elastic(self, resource) -> Elasticsearch: """Get ElasticSearch instance for given resource.""" px = self._resource_prefix(resource) @@ -1041,46 +1043,85 @@ def search(self, query, resources, params=None): except elasticsearch.exceptions.RequestError: raise - def reindex(self, resource): + def reindex(self, resource, requests_per_second=100): es = self.elastic(resource) alias = self._resource_index(resource) settings = self._resource_config(resource, "SETTINGS") mapping = self._resource_mapping(resource) - new_index = False - try: - old_index = self.get_index(resource) - except elasticsearch.exceptions.NotFoundError: - new_index = True + + old_index = None + indexes = es.indices.get_alias(index=alias) + for index, aliases in indexes.items(): + old_index = index + specs = aliases["aliases"][alias] + if specs and specs["is_write_index"]: + break + + if old_index: + print("OLD INDEX", old_index) # create new index - index = generate_index_name(alias) - print("create", index) - self._create_index(es, index, settings) - self._put_mapping(es, index, mapping) - - if not new_index: - # reindex data - print("reindex", alias, index) - reindex(es, alias, index) - - # remove old alias - print("remove alias", alias, old_index) - try: - es.indices.delete_alias(index=old_index, name=alias) - except elasticsearch.exceptions.NotFoundError: - # this was not an alias, but an index. will be removed in next step - pass + new_index = generate_index_name(alias) + self._create_index(es, new_index, settings) + self._put_mapping(es, new_index, mapping) - # remove old index - print("remove index", old_index) - es.indices.delete(old_index) + print("NEW INDEX", new_index) + + if not old_index: + es.indices.put_alias(index=new_index, name=alias) + print("There is no index to reindex from, done.") + return - # create alias for new index - print("put", alias, index) - es.indices.put_alias(index=index, name=alias) + # add the new index as writable + es.indices.update_aliases( + body={ + "actions": [ + { + "add": { # add new index as write index + "index": new_index, + "alias": alias, + "is_write_index": True, + }, + }, + { + "add": { # make sure the old index is not write index + "index": old_index, + "alias": alias, + "is_write_index": False, + }, + }, + ], + } + ) + + # reindex data from old to new index + resp = es.reindex( + body={ + "source": {"index": old_index}, + "dest": {"index": new_index, "version_type": "external"}, + }, + requests_per_second=requests_per_second, + wait_for_completion=False, + refresh=True, + ) + task_id = resp["task"] + print(f"REINDEXING task {task_id}") + + while True: + time.sleep(2.0) + task_info = es.tasks.get(task_id=task_id) + if task_info["completed"]: + total = task_info["response"]["total"] + took = int(task_info["response"]["took"] / 1000) # ms to s + print() + print(f"DONE reindexing {total} items, took {took}s") + break + else: + print(".", end="") - print("refresh", index) - es.indices.refresh(index) + # remove old index + print("REMOVE OLD INDEX", old_index) + es.indices.delete(index=old_index) def build_elastic_query(doc): diff --git a/setup.py b/setup.py index b57e655..aa7d0df 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ "arrow>=0.4.2", "ciso8601>=1.0.2,<2", "pytz>=2015.4", - "elasticsearch>=7.0,<7.14", + "elasticsearch>=7.0,<8.0", ], classifiers=[ "Development Status :: 4 - Beta",