Skip to content

Commit

Permalink
Test create_index in elasticsearch with bugfixes
Browse files Browse the repository at this point in the history
elasticsearch:
 - 5.6.16
 - 6.3.2
 - 6.6.2
 - 7.0.0-rc1
  • Loading branch information
matsgoran committed Apr 1, 2019
1 parent b942cb0 commit c0054e2
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 111 deletions.
17 changes: 12 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@ script: make test
jobs:
include:
- stage: "Elasticsearch"
env:
- TOXENV=py27 ES_VERSION=7.0.0-rc1
install:
- pip install tox
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}-linux-x86_64.tar.gz
- tar -xzf elasticsearch-${ES_VERSION}-linux-x86_64.tar.gz
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz
- mkdir elasticsearch-${ES_VERSION} && tar -xzf elasticsearch-${ES_VERSION}.tar.gz -C elasticsearch-${ES_VERSION} --strip-components=1
- ./elasticsearch-${ES_VERSION}/bin/elasticsearch &
script:
- wget -q --waitretry=1 --retry-connrefused --tries=30 -O - http://127.0.0.1:9200
- make test
- make test-elasticsearch
name: "Elasticsearch 7"
env: TOXENV=py27 ES_VERSION=7.0.0-rc1-linux-x86_64
- name: "Elasticsearch 6.6"
env: TOXENV=py27 ES_VERSION=6.6.2
- name: "Elasticsearch 6.3"
env: TOXENV=py27 ES_VERSION=6.3.2
- name: "Elasticsearch 5.6"
env: TOXENV=py27 ES_VERSION=5.6.16

deploy:
provider: pypi
user: yelplabs
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ install-hooks:
test:
tox

test-elasticsearch:
tox -- --runelasticsearch

test-docker:
docker-compose --project-name elastalert build tox
docker-compose --project-name elastalert run tox
Expand Down
189 changes: 103 additions & 86 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,101 @@
env = Env(ES_USE_SSL=bool)


def main():
def main(es_client, ea_index, recreate=False, old_ea_index=None):
esversion = es_client.info()["version"]["number"]
print("Elastic Version: " + esversion)
elasticversion = int(esversion.split(".")[0])

es_index_mappings = read_es_index_mappings() if elasticversion > 5 else read_es_index_mappings(5)

es_index = IndicesClient(es_client)
if not recreate:
if es_index.exists(ea_index):
print('Index ' + ea_index + ' already exists. Skipping index creation.')
return None

# (Re-)Create indices.
if (elasticversion > 5):
index_names = (
ea_index,
ea_index + '_status',
ea_index + '_silence',
ea_index + '_error',
ea_index + '_past',
)
else:
index_names = (
ea_index,
)
for index_name in index_names:
if es_index.exists(index_name):
print('Deleting index ' + index_name + '.')
try:
es_index.delete(index_name)
except NotFoundError:
# Why does this ever occur?? It shouldn't. But it does.
pass
es_index.create(index_name)

# To avoid a race condition. TODO: replace this with a real check
time.sleep(2)

if elasticversion > 5:
# TODO remove doc_type for elasticsearch >= 7 when elastic client supports doc_type=None
params = {'include_type_name': 'true'} if elasticversion > 6 else {}

es_client.indices.put_mapping(index=ea_index, doc_type='_doc',
body=es_index_mappings['elastalert'], params=params)
es_client.indices.put_mapping(index=ea_index + '_status', doc_type='_doc',
body=es_index_mappings['elastalert_status'], params=params)
es_client.indices.put_mapping(index=ea_index + '_silence', doc_type='_doc',
body=es_index_mappings['silence'], params=params)
es_client.indices.put_mapping(index=ea_index + '_error', doc_type='_doc',
body=es_index_mappings['elastalert_error'], params=params)
es_client.indices.put_mapping(index=ea_index + '_past', doc_type='_doc',
body=es_index_mappings['past_elastalert'], params=params)
else:
es_client.indices.put_mapping(index=ea_index, doc_type='elastalert',
body=es_index_mappings['elastalert'])
es_client.indices.put_mapping(index=ea_index, doc_type='elastalert_status',
body=es_index_mappings['elastalert_status'])
es_client.indices.put_mapping(index=ea_index, doc_type='silence',
body=es_index_mappings['silence'])
es_client.indices.put_mapping(index=ea_index, doc_type='elastalert_error',
body=es_index_mappings['elastalert_error'])
es_client.indices.put_mapping(index=ea_index, doc_type='past_elastalert',
body=es_index_mappings['past_elastalert'])

print('New index %s created' % ea_index)
if old_ea_index:
print("Copying all data from old index '{0}' to new index '{1}'".format(old_ea_index, ea_index))
# Use the defaults for chunk_size, scroll, scan_kwargs, and bulk_kwargs
elasticsearch.helpers.reindex(es_client, old_ea_index, ea_index)

print('Done!')


def read_es_index_mappings(es_version=6):
print('Reading Elastic {0} index mappings:'.format(es_version))
return {
'silence': read_es_index_mapping('silence', es_version),
'elastalert_status': read_es_index_mapping('elastalert_status', es_version),
'elastalert': read_es_index_mapping('elastalert', es_version),
'past_elastalert': read_es_index_mapping('past_elastalert', es_version),
'elastalert_error': read_es_index_mapping('elastalert_error', es_version)
}


def read_es_index_mapping(mapping, es_version=6):
base_path = os.path.abspath(os.path.dirname(__file__))
mapping_path = 'es_mappings/{0}/{1}.json'.format(es_version, mapping)
path = os.path.join(base_path, mapping_path)
with open(path, 'r') as f:
print("Reading index mapping '{0}'".format(mapping_path))
return json.load(f)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--host', default=os.environ.get('ES_HOST', None), help='Elasticsearch host')
parser.add_argument('--port', default=os.environ.get('ES_PORT', None), type=int, help='Elasticsearch port')
Expand All @@ -31,10 +125,12 @@ def main():
parser.add_argument('--ssl', action='store_true', default=env('ES_USE_SSL', None), help='Use TLS')
parser.add_argument('--no-ssl', dest='ssl', action='store_false', help='Do not use TLS')
parser.add_argument('--verify-certs', action='store_true', default=None, help='Verify TLS certificates')
parser.add_argument('--no-verify-certs', dest='verify_certs', action='store_false', help='Do not verify TLS certificates')
parser.add_argument('--no-verify-certs', dest='verify_certs', action='store_false',
help='Do not verify TLS certificates')
parser.add_argument('--index', help='Index name to create')
parser.add_argument('--old-index', help='Old index name to copy')
parser.add_argument('--send_get_body_as', default='GET', help='Method for querying Elasticsearch - POST, GET or source')
parser.add_argument('--send_get_body_as', default='GET',
help='Method for querying Elasticsearch - POST, GET or source')
parser.add_argument(
'--boto-profile',
default=None,
Expand All @@ -50,7 +146,8 @@ def main():
help='AWS Region to use for signing requests. Optionally use the AWS_DEFAULT_REGION environment variable')
parser.add_argument('--timeout', default=60, help='Elasticsearch request timeout')
parser.add_argument('--config', default='config.yaml', help='Global config file (default: config.yaml)')
parser.add_argument('--recreate', type=bool, default=False, help='Force re-creation of the index (this will cause data loss).')
parser.add_argument('--recreate', type=bool, default=False,
help='Force re-creation of the index (this will cause data loss).')
args = parser.parse_args()

if os.path.isfile(args.config):
Expand Down Expand Up @@ -104,6 +201,7 @@ def main():
else raw_input('Name of existing index to copy? (Default None) '))

timeout = args.timeout

auth = Auth()
http_auth = auth(host=host,
username=username,
Expand All @@ -124,85 +222,4 @@ def main():
ca_certs=ca_certs,
client_key=client_key)

esversion = es.info()["version"]["number"]
print("Elastic Version:" + esversion)
elasticversion = int(esversion.split(".")[0])

es_index_mappings = read_es_index_mappings() if elasticversion > 5 else read_es_index_mappings(5)

es_index = IndicesClient(es)
if not args.recreate:
if es_index.exists(index):
print('Index ' + index + ' already exists. Skipping index creation.')
return None

# (Re-)Create indices.
if (elasticversion > 5):
index_names = (
index,
index + '_status',
index + '_silence',
index + '_error',
index + '_past',
)
else:
index_names = (
index,
)
for index_name in index_names:
if es_index.exists(index_name):
print('Deleting index ' + index_name + '.')
try:
es_index.delete(index_name)
except NotFoundError:
# Why does this ever occur?? It shouldn't. But it does.
pass
es_index.create(index_name)

# To avoid a race condition. TODO: replace this with a real check
time.sleep(2)

if elasticversion > 5:
es.indices.put_mapping(index=index, doc_type='_doc', body=es_index_mappings['elastalert'])
es.indices.put_mapping(index=index + '_status', doc_type='_doc', body=es_index_mappings['elastalert_status'])
es.indices.put_mapping(index=index + '_silence', doc_type='_doc', body=es_index_mappings['silence'])
es.indices.put_mapping(index=index + '_error', doc_type='_doc', body=es_index_mappings['elastalert_error'])
es.indices.put_mapping(index=index + '_past', doc_type='_doc', body=es_index_mappings['past_elastalert'])
else:
es.indices.put_mapping(index=index, doc_type='elastalert', body=es_index_mappings['elastalert'])
es.indices.put_mapping(index=index, doc_type='elastalert_status', body=es_index_mappings['elastalert_status'])
es.indices.put_mapping(index=index, doc_type='silence', body=es_index_mappings['silence'])
es.indices.put_mapping(index=index, doc_type='elastalert_error', body=es_index_mappings['elastalert_error'])
es.indices.put_mapping(index=index, doc_type='past_elastalert', body=es_index_mappings['past_elastalert'])

print('New index %s created' % index)
if old_index:
print("Copying all data from old index '{0}' to new index '{1}'".format(old_index, index))
# Use the defaults for chunk_size, scroll, scan_kwargs, and bulk_kwargs
elasticsearch.helpers.reindex(es, old_index, index)

print('Done!')


def read_es_index_mappings(es_version=6):
print('Reading Elastic {0} index mappings:'.format(es_version))
return {
'silence': read_es_index_mapping('silence', es_version),
'elastalert_status': read_es_index_mapping('elastalert_status', es_version),
'elastalert': read_es_index_mapping('elastalert', es_version),
'past_elastalert': read_es_index_mapping('past_elastalert', es_version),
'elastalert_error': read_es_index_mapping('elastalert_error', es_version)
}


def read_es_index_mapping(mapping, es_version=6):
base_path = os.path.abspath(os.path.dirname(__file__))
mapping_path = 'es_mappings/{0}/{1}.json'.format(es_version, mapping)
path = os.path.join(base_path, mapping_path)
with open(path, 'r') as f:
print("Reading index mapping '{0}'".format(mapping_path))
return json.load(f)


if __name__ == '__main__':
main()
main(es_client=es, ea_index=index, recreate=args.recreate, old_ea_index=old_index)
6 changes: 4 additions & 2 deletions elastalert/es_mappings/5/elastalert.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"elastalert": {
"properties": {
"rule_name": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
},
"@timestamp": {
"type": "date",
Expand All @@ -21,7 +22,8 @@
"enabled": "false"
},
"aggregate_id": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion elastalert/es_mappings/5/elastalert_status.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"elastalert_status": {
"properties": {
"rule_name": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
},
"@timestamp": {
"type": "date",
Expand Down
6 changes: 4 additions & 2 deletions elastalert/es_mappings/5/past_elastalert.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"past_elastalert": {
"properties": {
"rule_name": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
},
"match_body": {
"type": "object",
Expand All @@ -13,7 +14,8 @@
"format": "dateOptionalTime"
},
"aggregate_id": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion elastalert/es_mappings/5/silence.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"silence": {
"properties": {
"rule_name": {
"type": "keyword"
"index": "not_analyzed",
"type": "string"
},
"until": {
"type": "date",
Expand Down
6 changes: 2 additions & 4 deletions elastalert/es_mappings/6/elastalert.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"properties": {
"rule_name": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
},
"@timestamp": {
"type": "date",
Expand All @@ -21,8 +20,7 @@
"enabled": "false"
},
"aggregate_id": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
}
}
}
3 changes: 1 addition & 2 deletions elastalert/es_mappings/6/elastalert_status.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"properties": {
"rule_name": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
},
"@timestamp": {
"type": "date",
Expand Down
6 changes: 2 additions & 4 deletions elastalert/es_mappings/6/past_elastalert.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"properties": {
"rule_name": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
},
"match_body": {
"type": "object",
Expand All @@ -13,8 +12,7 @@
"format": "dateOptionalTime"
},
"aggregate_id": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
}
}
}
3 changes: 1 addition & 2 deletions elastalert/es_mappings/6/silence.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"properties": {
"rule_name": {
"index": "not_analyzed",
"type": "string"
"type": "keyword"
},
"until": {
"type": "date",
Expand Down
2 changes: 1 addition & 1 deletion elastalert/rule_from_kibana.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def main():
query = {'query': {'term': {'_id': db_name}}}

if is_atleastsixsix(es_version):
# TODO add support for kibana 5
# TODO check support for kibana 7
# TODO use doc_type='_doc' instead
# TODO use _source_includes=[...] instead when elasticsearch client supports this
res = es.search(index='kibana-int', doc_type='dashboard', body=query, params={'_source_includes': 'dashboard'})
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
markers =
elasticsearch: mark a test as using elasticsearch.
Loading

0 comments on commit c0054e2

Please sign in to comment.