Skip to content

Commit

Permalink
Merge pull request #1660 from UUDigitalHumanitieslab/bugfix/es-index-…
Browse files Browse the repository at this point in the history
…version

fix: let es_index.create return versioned index name
  • Loading branch information
BeritJanssen authored Sep 25, 2024
2 parents d2ffc72 + 3b89aca commit a512140
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
3 changes: 2 additions & 1 deletion backend/corpora/jewishmigration/test_jewishmigration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def __init__(self, mock_content):
def json(self):
return self.mock_content

def mock_get(_dummy_path):

def mock_get(_dummy_path, headers=None):
return MockResponse(mock_content=[
{
"source": "Le Bohec 1981 n. 71",
Expand Down
9 changes: 8 additions & 1 deletion backend/es/es_alias.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
import re

from addcorpus.models import Corpus
from addcorpus.models import Corpus, CorpusConfiguration
from ianalyzer.elasticsearch import elasticsearch

import logging
Expand Down Expand Up @@ -51,6 +51,13 @@ def alias(corpus: Corpus, clean=False):
logger.info('Done updating aliases')


def get_current_index_name(corpus: CorpusConfiguration, client) -> str:
"""get the name of the current corpus' associated index"""
alias = corpus.es_alias or corpus.es_index
indices = client.indices.get(index="{}".format(alias))
return max(sorted(indices.keys()))


def get_new_version_number(client, alias, current_index=None):
'''
Get version number for a new versioned index (e.g. `indexname-1`).
Expand Down
48 changes: 29 additions & 19 deletions backend/es/es_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from addcorpus.python_corpora.load_corpus import load_corpus_definition
from addcorpus.reader import make_reader
from ianalyzer.elasticsearch import elasticsearch
from .es_alias import alias, get_new_version_number
from .es_alias import alias, get_current_index_name, get_new_version_number
import datetime

import logging
Expand Down Expand Up @@ -50,7 +50,13 @@ def _make_es_mapping(corpus_configuration: CorpusConfiguration) -> Dict:
}


def create(client: Elasticsearch, corpus: Corpus, add: bool = False, clear: bool = False, prod: bool = False):
def create(
client: Elasticsearch,
corpus: Corpus,
add: bool = False,
clear: bool = False,
prod: bool = False,
) -> str:
'''
Initialise an ElasticSearch index.
'''
Expand All @@ -59,8 +65,8 @@ def create(client: Elasticsearch, corpus: Corpus, add: bool = False, clear: bool
es_mapping = _make_es_mapping(corpus_config)

if add:
# we add document to existing index - skip creation.
return None
# we add document to existing index - skip creation, return current index
return get_current_index_name(corpus_config, client)

if clear:
logger.info('Attempting to clean old index...')
Expand Down Expand Up @@ -93,20 +99,26 @@ def create(client: Elasticsearch, corpus: Corpus, add: bool = False, clear: bool
settings=settings,
mappings=es_mapping,
)
return index_name
except RequestError as e:
if 'already_exists' not in e.error:
# ignore that the index already exist,
# raise any other errors.
raise


def populate(client: Elasticsearch, corpus: Corpus, start=None, end=None):
def populate(
client: Elasticsearch,
corpus: Corpus,
versioned_index_name: str,
start=None,
end=None,
):
'''
Populate an ElasticSearch index from the corpus' source files.
'''
corpus_config = corpus.configuration
corpus_name = corpus.name
index_name = corpus_config.es_index
reader = make_reader(corpus)

logger.info('Attempting to populate index...')
Expand All @@ -121,11 +133,12 @@ def populate(client: Elasticsearch, corpus: Corpus, start=None, end=None):
# can be sent to ElasticSearch in bulk
actions = (
{
'_op_type': 'index',
'_index': index_name,
'_id': doc.get('id'),
'_source': doc
} for doc in docs
"_op_type": "index",
"_index": versioned_index_name,
"_id": doc.get("id"),
"_source": doc,
}
for doc in docs
)

corpus_server = settings.SERVERS[
Expand Down Expand Up @@ -176,26 +189,23 @@ def perform_indexing(
logger.info('retry on timeout: {}'.format(
vars(client).get('_retry_on_timeout'))
)
create(client, corpus, add, clear, prod)
versioned_index_name = create(client, corpus, add, clear, prod)
client.cluster.health(wait_for_status='yellow')

if mappings_only:
logger.info('Created index `{}` with mappings only.'.format(index_name))
return

populate(client, corpus, start=start, end=end)
populate(client, corpus, versioned_index_name, start=start, end=end)

logger.info('Finished indexing `{}` to index `{}`.'.format(
corpus_name, index_name))

if prod:
logger.info('Updating settings for index `{}`'.format(
index_name))
logger.info("Updating settings for index `{}`".format(versioned_index_name))
client.indices.put_settings(
settings={'number_of_replicas': 1},
index=index_name
settings={"number_of_replicas": 1}, index=versioned_index_name
)
if rollover:
logger.info('Adjusting alias for index `{}`'.format(
index_name))
logger.info("Adjusting alias for index `{}`".format(versioned_index_name))
alias(corpus) # not deleting old index, so we can roll back
15 changes: 15 additions & 0 deletions backend/es/tests/test_es_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ def test_mismatch_corpus_index_names(mock_corpus, corpus_definition, es_index_cl
def test_db_only_corpus(json_mock_corpus, es_client, index_json_mock_corpus):
res = es_client.count(index=json_mock_corpus.configuration.es_index)
assert res.get('count') == 10


def test_indexing_with_version(mock_corpus, corpus_definition, es_index_client):
corpus = Corpus.objects.get(name=mock_corpus)
perform_indexing(
corpus,
START,
END,
mappings_only=False,
add=False,
clear=False,
prod=True,
rollover=True,
)
assert es_index_client.indices.exists(index="times-test-1") == True

0 comments on commit a512140

Please sign in to comment.