-
Notifications
You must be signed in to change notification settings - Fork 8
/
elasticsearch_force_merge.py
executable file
·216 lines (196 loc) · 9.05 KB
/
elasticsearch_force_merge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/python
##-------------------------------------------------------------------
##
## File : elasticsearch_force_merge.py
## Author :
## Description :
## Run force merge for existing indices, which has many deleted documents
## Sample:
## - Run force-merge for indices which has many deleted records
## python ./elasticsearch_force_merge.py --min_deleted_count 1000 --min_deleted_ratio 0.1
##
## - Run force-merge for indices with matched index name
## python ./elasticsearch_force_merge.py --es_pattern_regexp "master-.*|staging-.*"
##
## - Run force-merge for all indices
## python ./elasticsearch_force_merge.py --min_deleted_count 0 --min_deleted_ratio 0
##
## --
## Created : <2017-02-24>
## Updated: Time-stamp: <2018-04-11 09:34:48>
##-------------------------------------------------------------------
import argparse
import requests
import sys
import socket
import json
import re
NAGIOS_OK_ERROR=0
NAGIOS_EXIT_ERROR=2
indices_before = ""
################################################################################
def setup_custom_logger(name):
import logging
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
log_fname = "/var/log/%s.log" % (name)
handler = logging.FileHandler(log_fname, mode='w')
handler.setFormatter(formatter)
screen_handler = logging.StreamHandler(stream=sys.stdout)
screen_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(screen_handler)
return logger
################################################################################
def get_es_health(es_host, es_port):
# make sure es is green. And response to the query fast
url = "http://%s:%s/_cluster/health?pretty" % (es_host, es_port)
r = requests.get(url)
if r.status_code != 200: raise Exception("Fail to run REST API: %s. Content: %s" % (url, r.content))
content_json = json.loads(r.content)
es_status = content_json["status"]
# TODO: return false, if es cluster is too slow to response
return es_status
def get_all_index_summary(es_host, es_port):
url = "http://%s:%s/_cat/indices?v" % (es_host, es_port)
r = requests.get(url)
if r.status_code != 200:
logger.error("Fail to run REST API: %s" % (url))
sys_exit(es_host, es_port)
l = []
for line in r.content.split("\n"):
# remove the header, and skip closed ES indices
if line == '' or " index " in line or " close " in line:
continue
l.append(line)
return "\n".join(l)
def print_index_setting(es_host, es_port, index_name):
url = "http://%s:%s/%s/_stats?pretty" % (es_host, es_port, index_name)
r = requests.get(url)
if r.status_code != 200:
logger.error("Fail to run REST API: %s" % (url))
sys_exit(es_host, es_port)
content_json = json.loads(r.content)
logger.info("Index setting for %s.\n\tdocs:%s\n\tmerges:%s\n\tsegments:%s\n\n" % \
(index_name,
json.dumps(content_json["_all"]["primaries"]["docs"]),
json.dumps(content_json["_all"]["primaries"]["merges"]),
json.dumps(content_json["_all"]["primaries"]["segments"])))
################################################################################
def sys_exit(es_host, es_port, exit_code = NAGIOS_EXIT_ERROR):
if exit_code != 0:
logger.error("Unexpected error has happened. Current summary of ES indices.")
if indices_before != "":
logger.info("Indices summary before force-merge.\n%s" % (indices_before))
indices_after = get_all_index_summary(es_host, es_port)
logger.info("Indices summary after force-merge.\n%s" % (indices_after))
sys.exit(exit_code)
def get_es_index_info(es_host, es_port, es_pattern_regexp, \
min_deleted_count, min_deleted_ratio):
index_list = []
url = "http://%s:%s/_cat/indices?v" % (es_host, es_port)
# TODO: error handling, if curl requests fails
r = requests.get(url)
'''
Sample output:
root@test:/# curl 172.17.0.8:9200/_cat/indices?v
health status index pri rep docs.count docs.deleted store.size pri.store.size
green open master-index-098f6bcd4621d373cade4e832627b4f6 1 0 1 0 8.1kb 8.1kb
green open master-index-13a1f8adbec032ed68f3d035449ef48d 1 0 1 0 10.6kb 10.6kb
...
...
'''
if r.status_code != 200:
logger.error("Fail to run REST API: %s" % (url))
sys_exit(es_host, es_port)
# TODO: use python library for ES
for line in r.content.split("\n"):
# remove the header, and skip closed ES indices
if line == '' or " index " in line or " close " in line:
continue
else:
line = " ".join(line.split())
l = line.split()
index_name = l[2]
# skip indices, if not in the matched pattern
if es_pattern_regexp != "":
m = re.search(es_pattern_regexp, index_name)
if m is None:
continue
total_doc_count = int(l[5])
deleted_doc_count = int(l[6])
if (deleted_doc_count < min_deleted_count): continue
if min_deleted_ratio != 0 and float(deleted_doc_count)/total_doc_count < min_deleted_ratio:
continue
index_list.append([index_name, total_doc_count, deleted_doc_count])
return index_list
def force_merge_index(es_host, es_port, index_name):
if get_es_health(es_host, es_port) != 'green':
logger.error("ERROR: es is not green. Skip force merging for index(%s)" % (index_name))
sys.exit(1)
print_index_setting(es_host, es_port, index_name)
# TODO: Quit if something wrong; get time performance
# force-merge is a sync call, and it might take a long time
url = \
"http://%s:%s/%s/_forcemerge?pretty&only_expunge_deletes=true" % \
(es_host, es_port, index_name)
r = requests.post(url)
if r.status_code != 200:
logger.error("Fail to run REST API: %s" % (url))
sys_exit(es_host, es_port)
logger.info("http response: %s" % (r.content))
print_index_setting(es_host, es_port, index_name)
# Sample:
# python ./elasticsearch_force_merge.py --es_pattern_regexp "master-.*|staging-.*" \
# --min_deleted_count 1000 \
# --min_deleted_ratio 0.1
#
# python /tmp/elasticsearch_force_merge.py --min_deleted_count 0 --min_deleted_ratio 0
logger = setup_custom_logger('myapp')
if __name__ == '__main__':
# get parameters from users
parser = argparse.ArgumentParser()
parser.add_argument('--es_host', required=False, \
help="server ip or hostname for elasticsearch instance. Default value is ip of eth0", type=str)
parser.add_argument('--es_port', default='9200', required=False, \
help="server port for elasticsearch instance", type=str)
parser.add_argument('--es_pattern_regexp', required=False, default='', \
help="ES index name pattern. Only ES indices with matched pattern will be examined", type=str)
parser.add_argument('--min_deleted_count', default=1000, required=False, \
help='If indices do not have too many deleted docs, skip the force merge', type=int)
parser.add_argument('--min_deleted_ratio', default=0.05, required=False, \
help='If the ratio of deleted/total doc count is too small, skip the force merge', type=float)
l = parser.parse_args()
es_host = l.es_host
es_port = l.es_port
es_pattern_regexp = l.es_pattern_regexp
min_deleted_count = int(l.min_deleted_count)
min_deleted_ratio = float(l.min_deleted_ratio)
# get ip of eth0, if es_host is not given
if es_host is None:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
es_host = s.getsockname()[0]
try:
indices_stats = get_all_index_summary(es_host, es_port)
logger.info("Indices summary:\n%s" % (indices_stats))
es_index_list = get_es_index_info(es_host, es_port, es_pattern_regexp, \
min_deleted_count, min_deleted_ratio)
if len(es_index_list) == 0:
logger.info("OK: no indices need to run force-merge.")
else:
indices_before = get_all_index_summary(es_host, es_port)
updated_index_list = []
for es_index in es_index_list:
index_name = es_index[0]
logger.info("Run force-merge for %s" % (index_name))
force_merge_index(es_host, es_port, index_name)
updated_index_list.append(index_name)
logger.info("OK: Run force-merge successfully on below indices: %s" % (','.join(updated_index_list)))
sys_exit(es_host, es_port, 0)
except Exception as e:
logger.error("Unexpected error:%s, %s" % (sys.exc_info()[0], e))
sys_exit(es_host, es_port, 1)
## File : elasticsearch_force_merge.py ends