Skip to content

Commit

Permalink
Extend ElasticSearch and move elasticsearch version checks into extended
Browse files Browse the repository at this point in the history
class

elasticsearch_test - test api calls against elasticsearch
  • Loading branch information
matsgoran committed Apr 4, 2019
1 parent a6ec405 commit a0e68fe
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 110 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ indent_size = 4
[Makefile]
indent_style = tab

[{*.json,*.yml}]
[{*.json,*.yml,*.yaml}]
indent_style = space
indent_size = 2
70 changes: 22 additions & 48 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def __init__(self, args):
self.num_hits = 0
self.num_dupes = 0
self.current_es = None
self.current_es_addr = None
self.buffer_time = self.conf['buffer_time']
self.silence_cache = {}
self.rule_hashes = get_rule_hashes(self.conf, self.args.rule)
Expand All @@ -134,7 +133,6 @@ def __init__(self, args):
self.add_metadata_alert = self.conf.get('add_metadata_alert', False)

self.writeback_es = elasticsearch_client(self.conf)
self._es_version = None

remove = []
for rule in self.rules:
Expand All @@ -145,28 +143,6 @@ def __init__(self, args):
if self.args.silence:
self.silence()

def get_version(self):
info = self.writeback_es.info()
return info['version']['number']

@property
def es_version(self):
if self._es_version is None:
self._es_version = self.get_version()
return self._es_version

def is_atleastfive(self):
return int(self.es_version.split(".")[0]) >= 5

def is_atleastsix(self):
return int(self.es_version.split(".")[0]) >= 6

def is_atleastsixsix(self):
return float('.'.join(self.es_version.split(".")[:2])) >= 6.6

def is_atleastseven(self):
return int(self.es_version.split(".")[0]) >= 7

@staticmethod
def get_index(rule, starttime=None, endtime=None):
""" Gets the index for a rule. If strftime is set and starttime and endtime
Expand Down Expand Up @@ -289,7 +265,7 @@ def get_index_start(self, index, timestamp_field='@timestamp'):
"""
query = {'sort': {timestamp_field: {'order': 'asc'}}}
try:
if self.is_atleastsixsix():
if self.current_es.is_atleastsixsix():
# TODO use _source_includes=[...] instead when elasticsearch client supports this
res = self.current_es.search(index=index, size=1, body=query,
params={'_source_includes': timestamp_field}, ignore_unavailable=True)
Expand Down Expand Up @@ -366,7 +342,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
to_ts_func=rule['dt_to_ts'],
five=rule['five'],
)
if self.is_atleastsixsix():
if self.current_es.is_atleastsixsix():
# TODO fix when elasticsearch client supports param _source_includes
# Since _source_includes is not supported we must use params instead.
# the value object in _source_includes is not automagically parsed into a legal
Expand Down Expand Up @@ -395,7 +371,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
**extra_args
)

if self.is_atleastseven():
if self.current_es.is_atleastseven():
self.total_hits = int(res['hits']['total']['value'])
else:
self.total_hits = int(res['hits']['total'])
Expand Down Expand Up @@ -585,7 +561,7 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_
else:
payload = res['aggregations']

if self.is_atleastseven():
if self.current_es.is_atleastseven():
self.num_hits += res['hits']['total']['value']
else:
self.num_hits += res['hits']['total']
Expand Down Expand Up @@ -678,14 +654,14 @@ def get_starttime(self, rule):
"""
sort = {'sort': {'@timestamp': {'order': 'desc'}}}
query = {'filter': {'term': {'rule_name': '%s' % (rule['name'])}}}
if self.is_atleastfive():
if self.writeback_es.is_atleastfive():
query = {'query': {'bool': query}}
query.update(sort)

try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
index = self.get_six_index('elastalert_status')
if self.is_atleastsixsix():
if self.writeback_es.is_atleastsixsix():
# TODO use _source_includes=[...] instead when elasticsearch client supports this
res = self.writeback_es.search(index=index, doc_type='_doc', size=1, body=query,
params={'_source_includes': 'endtime,rule_name'})
Expand Down Expand Up @@ -843,7 +819,7 @@ def enhance_filter(self, rule):
else:
query = " OR ".join(additional_terms)
query_str_filter = {'query_string': {'query': query}}
if self.is_atleastfive():
if self.writeback_es.is_atleastfive():
filters.append(query_str_filter)
else:
filters.append({'query': query_str_filter})
Expand All @@ -859,7 +835,6 @@ def run_rule(self, rule, endtime, starttime=None):
"""
run_start = time.time()
self.current_es = elasticsearch_client(rule)
self.current_es_addr = (rule['es_host'], rule['es_port'])

# If there are pending aggregate matches, try processing them
for x in range(len(rule['agg_matches'])):
Expand Down Expand Up @@ -984,7 +959,7 @@ def init_rule(self, new_rule, new=True):
if 'top_count_keys' in new_rule and new_rule.get('raw_count_keys', True):
if self.string_multi_field_name:
string_multi_field_name = self.string_multi_field_name
elif self.is_atleastfive():
elif self.writeback_es.is_atleastfive():
string_multi_field_name = '.keyword'
else:
string_multi_field_name = '.raw'
Expand Down Expand Up @@ -1032,7 +1007,7 @@ def init_rule(self, new_rule, new=True):
def modify_rule_for_ES5(new_rule):
# Get ES version per rule
rule_es = elasticsearch_client(new_rule)
if int(rule_es.info()['version']['number'].split(".")[0]) >= 5:
if rule_es.is_atleastfive():
new_rule['five'] = True
else:
new_rule['five'] = False
Expand Down Expand Up @@ -1335,7 +1310,7 @@ def get_dashboard(self, rule, db_name):
raise EAException("use_kibana_dashboard undefined")
query = {'query': {'term': {'_id': db_name}}}
try:
if self.is_atleastsixsix():
if es.is_atleastsixsix():
# TODO use doc_type = _doc
# TODO use _source_includes=[...] instead when elasticsearch client supports for this
res = es.search(index='kibana-int', doc_type='dashboard', body=query,
Expand Down Expand Up @@ -1533,7 +1508,7 @@ def writeback(self, doc_type, body):
writeback_body['@timestamp'] = dt_to_ts(ts_now())

try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
writeback_index = self.get_six_index(doc_type)
res = self.writeback_es.index(index=writeback_index, doc_type='_doc', body=body)
else:
Expand All @@ -1554,13 +1529,13 @@ def find_recent_pending_alerts(self, time_limit):
time_filter = {'range': {'alert_time': {'from': dt_to_ts(ts_now() - time_limit),
'to': dt_to_ts(ts_now())}}}
sort = {'sort': {'alert_time': {'order': 'asc'}}}
if self.is_atleastfive():
if self.writeback_es.is_atleastfive():
query = {'query': {'bool': {'must': inner_query, 'filter': time_filter}}}
else:
query = {'query': inner_query, 'filter': time_filter}
query.update(sort)
try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
res = self.writeback_es.search(index=self.writeback_index, doc_type='_doc', body=query, size=1000)
else:
res = self.writeback_es.search(index=self.writeback_index, doc_type='elastalert', body=query, size=1000)
Expand Down Expand Up @@ -1593,7 +1568,6 @@ def send_pending_alerts(self):

# Set current_es for top_count_keys query
self.current_es = elasticsearch_client(rule)
self.current_es_addr = (rule['es_host'], rule['es_port'])

# Send the alert unless it's a future alert
if ts_now() > ts_to_dt(alert_time):
Expand All @@ -1616,7 +1590,7 @@ def send_pending_alerts(self):

# Delete it from the index
try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
self.writeback_es.delete(index=self.writeback_index, doc_type='_doc', id=_id)
else:
self.writeback_es.delete(index=self.writeback_index, doc_type='elastalert', id=_id)
Expand Down Expand Up @@ -1649,15 +1623,15 @@ def get_aggregated_matches(self, _id):
query = {'query': {'query_string': {'query': 'aggregate_id:%s' % (_id)}}, 'sort': {'@timestamp': 'asc'}}
matches = []
try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
res = self.writeback_es.search(index=self.writeback_index, doc_type='_doc', body=query,
size=self.max_aggregation)
else:
res = self.writeback_es.search(index=self.writeback_index, doc_type='elastalert', body=query,
size=self.max_aggregation)
for match in res['hits']['hits']:
matches.append(match['_source'])
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
self.writeback_es.delete(index=self.writeback_index, doc_type='_doc', id=match['_id'])
else:
self.writeback_es.delete(index=self.writeback_index, doc_type='elastalert', id=match['_id'])
Expand All @@ -1672,11 +1646,11 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
'must_not': [{'exists': {'field': 'aggregate_id'}}]}}}
if aggregation_key_value:
query['filter']['bool']['must'].append({'term': {'aggregation_key': aggregation_key_value}})
if self.is_atleastfive():
if self.writeback_es.is_atleastfive():
query = {'query': {'bool': query}}
query['sort'] = {'alert_time': {'order': 'desc'}}
try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
res = self.writeback_es.search(index=self.writeback_index, doc_type='_doc', body=query, size=1)
else:
res = self.writeback_es.search(index=self.writeback_index, doc_type='elastalert', body=query, size=1)
Expand Down Expand Up @@ -1811,16 +1785,16 @@ def is_silenced(self, rule_name):
return False
query = {'term': {'rule_name': rule_name}}
sort = {'sort': {'until': {'order': 'desc'}}}
if self.is_atleastfive():
if self.writeback_es.is_atleastfive():
query = {'query': query}
else:
query = {'filter': query}
query.update(sort)

try:
if self.is_atleastsix():
if self.writeback_es.is_atleastsix():
index = self.get_six_index('silence')
if self.is_atleastsixsix():
if self.writeback_es.is_atleastsixsix():
# TODO use _source_includes=[...] instead when elasticsearch client supports this
res = self.writeback_es.search(index=index, doc_type='_doc',
size=1, body=query, params={'_source_includes': 'until,exponent'})
Expand Down
44 changes: 44 additions & 0 deletions elastalert/elasticsearchclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch, RequestsHttpConnection


class ElasticSearchClient(Elasticsearch):

def __init__(self, conf):
super(ElasticSearchClient, self).__init__(host=conf['es_host'],
port=conf['es_port'],
url_prefix=conf['es_url_prefix'],
use_ssl=conf['use_ssl'],
verify_certs=conf['verify_certs'],
ca_certs=conf['ca_certs'],
connection_class=RequestsHttpConnection,
http_auth=conf['http_auth'],
timeout=conf['es_conn_timeout'],
send_get_body_as=conf['send_get_body_as'],
client_cert=conf['client_cert'],
client_key=conf['client_key'])
self._conf = conf
self._es_version = None

@property
def conf(self):
return self._conf

@property
def es_version(self):
if self._es_version is None:
self._es_version = self.info()['version']['number']
return self._es_version

def is_atleastfive(self):
return int(self.es_version.split(".")[0]) >= 5

def is_atleastsix(self):
return int(self.es_version.split(".")[0]) >= 6

def is_atleastsixsix(self):
major, minor = map(int, self.es_version.split(".")[:2])
return major > 6 or (major == 6 and minor >= 6)

def is_atleastseven(self):
return int(self.es_version.split(".")[0]) >= 7
18 changes: 3 additions & 15 deletions elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import dateutil.parser
import dateutil.tz
from auth import Auth
from elasticsearch import RequestsHttpConnection
from elasticsearch.client import Elasticsearch
import elasticsearchclient
from six import string_types

logging.basicConfig()
Expand Down Expand Up @@ -281,7 +280,7 @@ def replace_dots_in_field_names(document):


def elasticsearch_client(conf):
""" returns an Elasticsearch instance configured using an es_conn_config """
""" returns an ElasticsearchClient instance configured using an es_conn_config """
es_conn_conf = build_es_conn_config(conf)
auth = Auth()
es_conn_conf['http_auth'] = auth(host=es_conn_conf['es_host'],
Expand All @@ -290,18 +289,7 @@ def elasticsearch_client(conf):
aws_region=es_conn_conf['aws_region'],
profile_name=es_conn_conf['profile'])

return Elasticsearch(host=es_conn_conf['es_host'],
port=es_conn_conf['es_port'],
url_prefix=es_conn_conf['es_url_prefix'],
use_ssl=es_conn_conf['use_ssl'],
verify_certs=es_conn_conf['verify_certs'],
ca_certs=es_conn_conf['ca_certs'],
connection_class=RequestsHttpConnection,
http_auth=es_conn_conf['http_auth'],
timeout=es_conn_conf['es_conn_timeout'],
send_get_body_as=es_conn_conf['send_get_body_as'],
client_cert=es_conn_conf['client_cert'],
client_key=es_conn_conf['client_key'])
return elasticsearchclient.ElasticSearchClient(es_conn_conf)


def build_es_conn_config(conf):
Expand Down
31 changes: 15 additions & 16 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,14 +1324,13 @@ def test_query_with_whitelist_filter_es(ea):
in new_rule['filter'][-1]['query']['query_string']['query']


def test_query_with_whitelist_filter_es_five(ea):
ea.es_version = '6.2'
ea.rules[0]['_source_enabled'] = False
ea.rules[0]['filter'] = [{'query_string': {'query': 'baz'}}]
ea.rules[0]['compare_key'] = "username"
ea.rules[0]['whitelist'] = ['xudan1', 'xudan12', 'aa1', 'bb1']
new_rule = copy.copy(ea.rules[0])
ea.init_rule(new_rule, True)
def test_query_with_whitelist_filter_es_five(ea_sixsix):
ea_sixsix.rules[0]['_source_enabled'] = False
ea_sixsix.rules[0]['filter'] = [{'query_string': {'query': 'baz'}}]
ea_sixsix.rules[0]['compare_key'] = "username"
ea_sixsix.rules[0]['whitelist'] = ['xudan1', 'xudan12', 'aa1', 'bb1']
new_rule = copy.copy(ea_sixsix.rules[0])
ea_sixsix.init_rule(new_rule, True)
assert 'NOT username:"xudan1" AND NOT username:"xudan12" AND NOT username:"aa1"' in \
new_rule['filter'][-1]['query_string']['query']

Expand All @@ -1347,13 +1346,13 @@ def test_query_with_blacklist_filter_es(ea):
new_rule['filter'][-1]['query']['query_string']['query']


def test_query_with_blacklist_filter_es_five(ea):
ea.es_version = '6.2'
ea.rules[0]['_source_enabled'] = False
ea.rules[0]['filter'] = [{'query_string': {'query': 'baz'}}]
ea.rules[0]['compare_key'] = "username"
ea.rules[0]['blacklist'] = ['xudan1', 'xudan12', 'aa1', 'bb1']
new_rule = copy.copy(ea.rules[0])
ea.init_rule(new_rule, True)
def test_query_with_blacklist_filter_es_five(ea_sixsix):
ea_sixsix.rules[0]['_source_enabled'] = False
ea_sixsix.rules[0]['filter'] = [{'query_string': {'query': 'baz'}}]
ea_sixsix.rules[0]['compare_key'] = "username"
ea_sixsix.rules[0]['blacklist'] = ['xudan1', 'xudan12', 'aa1', 'bb1']
ea_sixsix.rules[0]['blacklist'] = ['xudan1', 'xudan12', 'aa1', 'bb1']
new_rule = copy.copy(ea_sixsix.rules[0])
ea_sixsix.init_rule(new_rule, True)
assert 'username:"xudan1" OR username:"xudan12" OR username:"aa1"' in new_rule['filter'][-1]['query_string'][
'query']
Loading

0 comments on commit a0e68fe

Please sign in to comment.