Skip to content

Commit

Permalink
use old index mapping for tmp index during reingestion
Browse files Browse the repository at this point in the history
it uses both for searching during reingestion and if
there is a mapping change in a field used in the query
it would fail.
  • Loading branch information
petrjasek committed Dec 17, 2024
1 parent da295ab commit 7b37a80
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
40 changes: 12 additions & 28 deletions eve_elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,13 +1048,14 @@ def reindex(self, resource, *, requests_per_second=1000): # noqa: F811
es = self.elastic(resource)
alias = self._resource_index(resource)
settings = self._resource_config(resource, "SETTINGS")
mapping = self._resource_mapping(resource)
mappings = self._resource_mapping(resource)

old_index = None
old_index = old_mappings = None
try:
indexes = es.indices.get_alias(name=alias)
for index, aliases in indexes.items():
old_index = index
old_mappings = es.indices.get_mapping(index=index)
specs = aliases["aliases"][alias]
if specs and specs["is_write_index"]:
break
Expand All @@ -1066,8 +1067,10 @@ def reindex(self, resource, *, requests_per_second=1000): # noqa: F811

# create new index
new_index = generate_index_name(alias)
self._create_index(es, new_index, settings)
self._put_mapping(es, new_index, mapping)
es.indices.create(index=new_index, body={
"settings": {"index": settings["settings"]} if settings else {},
"mappings": fix_mapping(mappings) if mappings else {},
})

print("NEW INDEX", new_index)

Expand Down Expand Up @@ -1107,31 +1110,12 @@ def reindex(self, resource, *, requests_per_second=1000): # noqa: F811

# tmp index will be used for new items arriving during reindex
tmp_index = f"{old_index}-tmp"
self._create_index(es, tmp_index, settings)
self._put_mapping(es, tmp_index, mapping)
print("TMP INDEX", tmp_index)
es.indices.rollover(alias=alias, new_index=tmp_index, body={
"mappings": old_mappings[old_index]["mappings"] if old_mappings else mappings,
"settings": {"index": settings["settings"]} if settings else None,
})

# add tmp index as writable
es.indices.update_aliases(
body={
"actions": [
{
"add": { # add tmp index as write index
"index": tmp_index,
"alias": alias,
"is_write_index": True,
},
},
{
"add": { # make sure the old index is not write index
"index": old_index,
"alias": alias,
"is_write_index": False,
},
},
],
}
)
print("TMP INDEX", tmp_index)

_background_reindex(
es, old_index, new_index, requests_per_second=requests_per_second
Expand Down
19 changes: 19 additions & 0 deletions test/test_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,25 @@ def test_reindex_without_old_index(self):

assert es.indices.exists_alias(alias)

def test_reindex(self):
ITEMS = "items"
with self.app.app_context():
elastic = self.app.data
elastic.insert(ITEMS, [{"uri": "foo", "name": "item"}])
old_index = elastic.get_index(ITEMS)
elastic.reindex(ITEMS)
new_index = elastic.get_index(ITEMS)
assert old_index != new_index
docs = elastic.search(
{
"query": {
"match_all": {},
},
},
ITEMS,
)
self.assertEqual(1, docs.count())


class TestElasticSearchWithSettings(TestCase):
resource = "items"
Expand Down

0 comments on commit 7b37a80

Please sign in to comment.