Skip to content

Commit

Permalink
Merge pull request #7 from untergeek/ilm/phase_in_list
Browse files Browse the repository at this point in the history
A bunch of typing fixes, and improved ILM phase monitoring
  • Loading branch information
untergeek authored Jan 25, 2025
2 parents 0e94cd2 + d7c8c80 commit e945ca5
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/es_wait/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Top-level init file"""

__version__ = '0.9.1'
__version__ = '0.9.2'
from .exists import Exists
from .health import Health
from .index import Index
Expand Down
6 changes: 4 additions & 2 deletions src/es_wait/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

logger = logging.getLogger('es_wait.Waiter')

# pylint: disable=R0912,R1702


class Waiter:
"""Waiter Parent Class"""

def __init__(
self,
client: 'Elasticsearch',
pause: float = 9, # The delay between checks
timeout: float = -1, # How long is too long
pause: float = 9.0, # The delay between checks
timeout: float = -1.0, # How long is too long
) -> None:
#: An :py:class:`Elasticsearch <elasticsearch.Elasticsearch>` client instance
self.client = client
Expand Down
19 changes: 14 additions & 5 deletions src/es_wait/exists.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
self,
client: 'Elasticsearch',
pause: float = 1.5,
timeout: float = 15,
timeout: float = 15.0,
name: str = '',
kind: t.Literal[
'index',
Expand All @@ -32,7 +32,8 @@ def __init__(
'template',
'component_template',
'component',
] = '',
'undef',
] = 'undef',
) -> None:

super().__init__(client=client, pause=pause, timeout=timeout)
Expand All @@ -41,7 +42,13 @@ def __init__(
#: What kind of entity
self.kind = kind
self.empty_check('name')
self.empty_check('kind')
if kind == 'undef':
msg = (
'kind must be one of index, data_stream, index_template, '
'template, component_template, or component'
)
logger.error(msg)
raise ValueError(msg)
self.waitstr = f'for {self.kindmap} "{name}" to exist'
logger.debug('Waiting %s...', self.waitstr)

Expand Down Expand Up @@ -79,13 +86,15 @@ def check(self) -> bool:
},
'FAIL': {'func': False, 'kwargs': {}},
}
return bool(doit[self.kindmap]['func'](**doit[self.kindmap]['kwargs']))
return bool(
doit[self.kindmap]['func'](**doit[self.kindmap]['kwargs']) # type: ignore
)

@property
def kindmap(
self,
) -> t.Literal[
'index or datastream', 'index template', 'component template', 'FAIL'
'index or data_stream', 'index template', 'component template', 'FAIL'
]:
"""
This method helps map :py:attr:`kind` to the proper 'exists' API call, as well
Expand Down
17 changes: 12 additions & 5 deletions src/es_wait/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ def __init__(
self,
client: 'Elasticsearch',
pause: float = 1.5,
timeout: float = 15,
timeout: float = 15.0,
action: t.Literal[
'allocation', 'cluster_routing', 'mount', 'replicas', 'shrink'
] = None,
'allocation', 'cluster_routing', 'mount', 'replicas', 'shrink', 'undef'
] = 'undef',
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout)
#: The action determines the kind of response we look for in the health check
self.action = action
if action == 'undef':
msg = (
'action must be one of allocation, cluster_routing, mount, '
'replicas, or shrink'
)
logger.error(msg)
raise ValueError(msg)
self.empty_check('action')
self.waitstr = self.getwaitstr
self.do_health_report = True
Expand Down Expand Up @@ -91,14 +98,14 @@ def check(self) -> bool:
return check

@property
def getwaitstr(self) -> t.AnyStr:
def getwaitstr(self) -> str:
"""
Define the waitstr based on :py:attr:`action`
:getter: Returns the proper waitstr
:type: str
"""
retval = None
retval = ''
if self.action in self.RELO_ACTIONS:
retval = 'for cluster health to show zero relocating shards'
if self.action in self.STATUS_ACTIONS:
Expand Down
55 changes: 47 additions & 8 deletions src/es_wait/ilm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import typing as t
import logging
from dotmap import DotMap
from dotmap import DotMap # type: ignore
from elasticsearch8.exceptions import NotFoundError
from ._base import Waiter
from .exceptions import IlmWaitError
Expand All @@ -21,9 +21,9 @@ class IndexLifecycle(Waiter):
def __init__(
self,
client: 'Elasticsearch',
pause: float = 1,
timeout: float = -1,
name: t.Union[str, None] = None,
pause: float = 1.0,
timeout: float = -1.0,
name: str = '',
) -> None:

super().__init__(client=client, pause=pause, timeout=timeout)
Expand Down Expand Up @@ -69,8 +69,8 @@ def __init__(
client: 'Elasticsearch',
pause: float = 1,
timeout: float = -1,
name: t.Union[str, None] = None,
phase: t.Union[str, None] = None,
name: str = '',
phase: str = '',
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout, name=name)
#: The target ILM phase
Expand All @@ -86,7 +86,9 @@ def check(self) -> bool:
"""
Collect ILM explain data from :py:meth:`get_explain_data()`, and check for ILM
phase change completion. It will return ``True`` if the expected phase and the
actually collected phase match.
actually collected phase match. If phase is ``new``, it will return ``True`` if
the collected phase is ``new`` or higher (``hot``, ``warm``, ``cold``,
``frozen``, ``delete``).
Upstream callers need to try/catch any of :py:exc:`KeyError` (index name
changed), :py:exc:`NotFoundError <elasticsearch.exceptions.NotFoundError>`, and
Expand All @@ -99,8 +101,45 @@ def check(self) -> bool:
:type: bool
"""
explain = DotMap(self.get_explain_data())
logger.info('ILM Phase %s found.', explain.phase)
if self.phase == 'new':
logger.debug('Expecting ILM Phase new, or higher')
if self.phase_by_num(explain.phase) >= self.phase_by_num(self.phase):
return True
else:
logger.debug('Expecting ILM Phase %s', self.phase)
return bool(explain.phase == self.phase)

def phase_by_num(self, phase: str) -> int:
"""Map a phase name to a phase number"""
_ = {
'undef': 0,
'new': 1,
'hot': 2,
'warm': 3,
'cold': 4,
'frozen': 5,
'delete': 6,
}
if phase in _:
return _[phase]
return 0 # Default to 0/undef if not found

def phase_by_name(self, num: int) -> str:
"""Map a phase number to a phase name"""
_ = {
# 0: 'undef',
1: 'new',
2: 'hot',
3: 'warm',
4: 'cold',
5: 'frozen',
6: 'delete',
}
if num in _:
return _[num]
return 'undef' # Default to 'undef' if not found


class IlmStep(IndexLifecycle):
"""
Expand All @@ -115,7 +154,7 @@ def __init__(
client: 'Elasticsearch',
pause: float = 1,
timeout: float = -1,
name: t.Union[str, None] = None,
name: str = '',
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout, name=name)
self.waitstr = f'for "{self.name}" to complete the current ILM step'
Expand Down
28 changes: 16 additions & 12 deletions src/es_wait/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ class Index(Waiter):

ACTIONS = ['allocation', 'cluster_routing', 'mount', 'replicas', 'shrink']
HEALTH_ACTIONS = ['health', 'mount', 'replicas', 'shrink']
HEALTH_ARGS = {'health': 'green'}
HEALTH_ARGS = {'status': 'green'}

def __init__(
self,
client: 'Elasticsearch',
pause: float = 1.5,
timeout: float = 15,
action: t.Literal['health', 'mount', 'replicas', 'shrink'] = None,
index: str = None,
timeout: float = 15.0,
action: t.Literal['health', 'mount', 'replicas', 'shrink', 'undef'] = 'undef',
index: str = '',
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout)
#: The action determines the kind of response we look for in the health check
self.action = action
if action == 'undef':
msg = 'action must be one of health, mount, replicas, or shrink'
logger.error(msg)
raise ValueError(msg)
self.index = index
self.empty_check('action')
self.empty_check('index')
self.resolve_index()
self.waitstr = self.getwaitstr
Expand All @@ -52,8 +55,8 @@ def argmap(self) -> t.Union[t.Dict[str, int], t.Dict[str, str]]:
@property
def check(self) -> bool:
"""
This function calls :py:meth:`cat.indices()
<elasticsearch.client.CatClient.indices>` and, based on the
This function calls :py:meth:`cluster.health()
<elasticsearch.client.ClusterClient.health>` and, based on the
return value from :py:meth:`argmap`, will return ``True`` or ``False``
depending on whether that particular keyword appears in the output, and has the
expected value.
Expand All @@ -63,9 +66,10 @@ def check(self) -> bool:
:getter: Returns if the check was complete
:type: bool
"""
res = self.client.cat.indices(index=self.index, format='json', h='health')
logger.debug('res = %s', res)
output = res[0]
output = dict(
self.client.cluster.health(index=self.index, filter_path='status')
)
logger.debug('output = %s', output)
check = True
args = self.argmap()
for key, value in args.items():
Expand Down Expand Up @@ -104,14 +108,14 @@ def resolve_index(self) -> None:
raise ValueError(f'{self.index} does not resolve to itself: {resp}')

@property
def getwaitstr(self) -> t.AnyStr:
def getwaitstr(self) -> str:
"""
Define the waitstr based on :py:attr:`action`
:getter: Returns the proper waitstr
:type: str
"""
retval = None
retval = ''
if self.action in self.HEALTH_ACTIONS:
retval = 'for index health to show green status'
return retval
6 changes: 3 additions & 3 deletions src/es_wait/relocate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class Relocate(Waiter):
def __init__(
self,
client: 'Elasticsearch',
pause: float = 9,
timeout: float = -1,
name: str = None,
pause: float = 9.0,
timeout: float = -1.0,
name: t.Optional[str] = None,
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout)
#: The index name
Expand Down
18 changes: 10 additions & 8 deletions src/es_wait/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ class Restore(Waiter):
def __init__(
self,
client: 'Elasticsearch',
pause: float = 9,
timeout: float = -1,
index_list: t.Sequence[str] = None,
pause: float = 9.0,
timeout: float = -1.0,
index_list: t.Optional[t.Sequence[str]] = None,
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout)
if not index_list:
index_list = []
#: The list of indices being restored
self.index_list = index_list
self.empty_check('index_list')
self.waitstr = 'for indices in index_list to be restored from snapshot'
logger.debug('Waiting %s...', self.waitstr)

@property
def index_list_chunks(self) -> t.Sequence[t.Sequence[t.AnyStr]]:
def index_list_chunks(self) -> t.Sequence[t.Sequence[str]]:
"""
This utility chunks very large index lists into 3KB chunks.
It measures the size as a csv string, then converts back into a list for the
Expand Down Expand Up @@ -78,7 +80,7 @@ def check(self) -> bool:
response = {}
for chunk in self.index_list_chunks:
chunk_response = self.get_recovery(chunk)
if chunk_response == {}:
if not chunk_response:
logger.debug('_recovery API returned an empty response. Trying again.')
return False
response.update(chunk_response)
Expand Down Expand Up @@ -106,11 +108,11 @@ def get_recovery(self, chunk: t.Sequence[str]) -> t.Dict:
:param chunk: A list of index names
"""
try:
chunk_response = self.client.indices.recovery(index=chunk, human=True)
chunk_response = dict(self.client.indices.recovery(index=chunk, human=True))
except Exception as err:
msg = (
f'Unable to obtain recovery information for specified indices. Error: '
f'{self.prettystr(err)}'
f'Unable to obtain recovery information for specified indices {chunk}. '
f'Error: {self.prettystr(err)}'
)
raise ValueError(msg) from err
return chunk_response
14 changes: 8 additions & 6 deletions src/es_wait/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class Snapshot(Waiter):
def __init__(
self,
client: 'Elasticsearch',
pause: float = 9,
timeout: float = -1,
snapshot: str = None,
repository: str = None,
pause: float = 9.0,
timeout: float = -1.0,
snapshot: str = '',
repository: str = '',
) -> None:
super().__init__(client=client, pause=pause, timeout=timeout)
#: The snapshot name
Expand Down Expand Up @@ -67,8 +67,10 @@ def snapstate(self) -> t.Dict:
"""
result = {}
try:
result = self.client.snapshot.get(
repository=self.repository, snapshot=self.snapshot
result = dict(
self.client.snapshot.get(
repository=self.repository, snapshot=self.snapshot
)
)
except Exception as err:
raise ValueError(
Expand Down
Loading

0 comments on commit e945ca5

Please sign in to comment.