diff --git a/Default.sublime-commands b/Default.sublime-commands index c04376f..4df46f8 100644 --- a/Default.sublime-commands +++ b/Default.sublime-commands @@ -110,4 +110,7 @@ {"command": "show_active_server", "caption": "Elasticsearch: Show Active Server"}, {"command": "reindex", "caption": "Elasticsearch: Reindex"}, + {"command": "dumpdata", "caption": "Elasticsearch: Dump Data"}, + {"command": "loaddata", "caption": "Elasticsearch: Load Data"}, + {"command": "copy_index", "caption": "Elasticsearch: Copy Data from ..."}, ] diff --git a/Elasticsearch.sublime-settings b/Elasticsearch.sublime-settings index 6ce1687..9b7030b 100644 --- a/Elasticsearch.sublime-settings +++ b/Elasticsearch.sublime-settings @@ -4,8 +4,6 @@ "base_url": "http://localhost:9200/", "index": "test", "doc_type": "test", - "analyzer": "default", - "hettp_headers": {} } }, // Path to the Apache Bench (ab) command. If ab is on you path, you should not need to @@ -19,6 +17,5 @@ "pretty_command": "pretty_json", "pretty_syntax": "Elasticsearch", - "ask_to_search_types": false, - "backup_location": "" + "fixture_dir": "" } diff --git a/Lib/elasticsearch/__init__.py b/Lib/elasticsearch/__init__.py index 2d1ab76..9efd0a9 100644 --- a/Lib/elasticsearch/__init__.py +++ b/Lib/elasticsearch/__init__.py @@ -47,7 +47,7 @@ def create(self, index, doc_type, body, id=None, params=None, command=None): return self.index(index, doc_type, body, id, params=params, command=command) def index(self, index, doc_type, body, id=None, params=None, command=None): - method = 'POST' if id is not None else 'PUT' + method = 'PUT' if id else 'POST' result = self.request( method, make_path(index, doc_type, id), body=body, params=params) return show_result_json(result.json(), command=command) diff --git a/Lib/elasticsearch/helpers.py b/Lib/elasticsearch/helpers.py index fe2b024..50dd603 100644 --- a/Lib/elasticsearch/helpers.py +++ b/Lib/elasticsearch/helpers.py @@ -1,9 +1,64 @@ +import gzip import json import sublime +from itertools import islice +from operator import methodcaller + from .utils import show_result_json from .utils import make_url +def change_index(hits, index): + for hit in hits: + hit['_index'] = index + yield hit + + +def expand_action(data): + data = data.copy() + action = {'index': {}} + for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp', + '_ttl', '_type', '_version', '_version_type', '_id', + '_retry_on_conflict'): + if key in data: + action['index'][key] = data.pop(key) + + return action, data.get('_source', data) + + +def bulk_index(client, docs, chunk_size=500, **kwargs): + success, failed = 0, 0 + errors = [] + actions = map(expand_action, docs) + + while True: + chunk = islice(actions, chunk_size) + + bulk_actions = [] + for action, data in chunk: + bulk_actions.append(json.dumps(action)) + if data is not None: + bulk_actions.append(json.dumps(data)) + + if not bulk_actions: + break + + body = "\n".join(bulk_actions) + "\n" + response = client.bulk(body, **kwargs) + + for op_type, item in map(methodcaller('popitem'), response['items']): + ok = 200 <= item.get('status', 500) < 300 + if not ok: + errors.append(item) + failed += 1 + else: + success += 1 + + sublime.status_message("docs: {}".format(success + failed)) + + return success, failed if not errors else errors + + def scan(client, query=None, scroll='5m', **kwargs): kwargs['params'] = dict(search_type='scan', scroll=scroll) @@ -36,7 +91,7 @@ def scan(client, query=None, scroll='5m', **kwargs): def reindex(client, source_index, target_index, query=None, target_client=None, - scroll='5m', scan_kwargs={}, command=None): + chunk_size=500, scroll='5m', scan_kwargs={}, command=None): target_client = client if target_client is None else target_client @@ -48,19 +103,58 @@ def reindex(client, source_index, target_index, query=None, target_client=None, docs = scan(client, query=query, index=source_index, scroll=scroll, **scan_kwargs) - success, failed = 0, 0 + success, failed = bulk_index( + target_client, change_index(docs, target_index), chunk_size=chunk_size) - for doc in docs: - response = target_client.index( - index=target_index, doc_type=doc['_type'], - body=json.dumps(doc['_source']), id=doc['_id']) + result['success'] = success + result['failed'] = failed + + show_result_json(result, sort_keys=True, command=command) + +copyindex = reindex + + +def dumpdata(outputfile, client, index, query=None, + scroll='5m', scan_kwargs={}, command=None): + + result = { + "_source": make_url(client.base_url, index), + "_output": outputfile + } + + docs = scan(client, query=query, index=index, + scroll=scroll, **scan_kwargs) + + count = 0 + with gzip.open(outputfile, 'wb') as f: + for doc in docs: + del doc['_score'] + + data = "{}\n".format(json.dumps(doc, ensure_ascii=False)) + f.write(bytes(data, 'utf-8')) + + count += 1 + sublime.status_message("dumps: {}".format(count)) + + result['docs'] = count + + show_result_json(result, sort_keys=True, command=command) + + +def loaddata(inputfile, client, index, chunk_size=500, command=None): + result = { + "_target": make_url(client.base_url, index), + "_input": inputfile + } - if 'error' in response.keys(): - failed += 1 - else: - success += 1 + docs = [] + with gzip.open(inputfile, 'rb') as f: + for doc in f: + docs.append(json.loads(doc.decode('utf-8'))) + sublime.status_message("read: {}".format(len(docs))) - sublime.status_message("{0:_>10}".format(success + failed)) + success, failed = bulk_index( + client, change_index(docs, index), chunk_size=chunk_size) result['success'] = success result['failed'] = failed diff --git a/README.md b/README.md index fbec2cc..378c7ab 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Setting | Description ``enabled_pretty`` | enabled pretty json. required: [PrettyJson](https://github.com/dzhibas/SublimePrettyJson) ``pretty_command`` | pretty format command. default: ``pretty_json`` ``pretty_syntax`` | pretty json target syntax. default: ``Elasticsearch`` -``backup_location`` | Path to the Dump Data & Load Data command. +``fixture_dir`` | Path to the Dump Data & Load Data command. #### servers.\* @@ -238,9 +238,12 @@ open the Command Palette (``Shift + Command + P``) and enter ``Elasticsearch ... * Elasticsearch: Apache Bench * Elasticsearch: Apache Bench for Search Template -### Commmand for Helpers +### Commmand for Index Data Management. * Elasticsearch: Reindex +* Elasticsearch: Dump Data +* Elasticsearch: Load Data +* Elasticsearch: Copy Data from ... ### Command for Sublime User Settings diff --git a/helpers.py b/helpers.py index 120271c..693a788 100644 --- a/helpers.py +++ b/helpers.py @@ -1,42 +1,136 @@ +import os.path +import glob +import datetime +import sublime + from .base import ElasticsearchCommand + from elasticsearch import Elasticsearch from elasticsearch.helpers import reindex +from elasticsearch.helpers import dumpdata +from elasticsearch.helpers import loaddata +from elasticsearch.helpers import copyindex class HelperBaseCommand(ElasticsearchCommand): - selected_index = 0 - - def select_server(self, callback): - self.server_choices = list(self.servers.keys()) - self.server_choices.sort() - self.window.show_quick_panel( - self.server_choices, self.on_done, - selected_index=self.selected_index) + show_result_on_window = False + syntax = 'Packages/JavaScript/JSON.tmLanguage' + comfirm_message = None + comfirm_ok_title = "OK" - def get_server_settings(self, index): - selected = self.server_choices[index] - return self.servers[selected] + def is_comfirmed(self): + ok = sublime.ok_cancel_dialog( + "{}\n\nTarget: {}".format( + self.comfirm_message, self.active_server), + self.comfirm_ok_title) + if ok: + return True + sublime.status_message('Canceled .') + return False class ReindexCommand(HelperBaseCommand): result_window_title = "Reindex" - show_result_on_window = False - syntax = 'Packages/JavaScript/JSON.tmLanguage' + comfirm_message = 'Are you sure you want to reindex ?' + comfirm_ok_title = 'Reindex' - def run(self): - self.select_server(self.on_done) + def run(self, chunk_size=1000): + if not self.is_comfirmed(): + return + + self.request(reindex, self.esclient, source_index=self.index, + chunk_size=chunk_size, target_index=self.index) + + +class CopyIndexCommand(HelperBaseCommand): + result_window_title = "Copy Index Data" + comfirm_message = 'Are you sure you want to copy data ?' + comfirm_ok_title = 'Copy' + + @property + def select_items(self): + items = list(self.servers.keys()) + items.sort(reverse=True) + return items + + def show_select_items(self, callback): + self.window.show_quick_panel(self.select_items, callback) + + def run(self, chunk_size=1000): + self.chunk_size = chunk_size + self.show_select_items(self.on_done) + + def source_esclient(self, index): + s = self.servers[self.select_items[index]] + return Elasticsearch(s.get('base_url'), s.get('http_headers')) + + def source_index(self, index): + s = self.servers[self.select_items[index]] + return s.get('index') def on_done(self, index): if index == -1: return - self.selected_index = index + if not self.is_comfirmed(): + return + + source_client = self.source_esclient(index) + source_index = self.source_index(index) + + self.request(copyindex, source_client, source_index, + target_client=self.esclient, target_index=self.index, + chunk_size=self.chunk_size) - server = self.get_server_settings(index) - base_url = server.get('base_url') - headers = server.get('http_headers') - index = server.get('index') - client = Elasticsearch(base_url, headers) - self.request(reindex, client, source_index=index, - target_index=index) +class DumpdataCommand(HelperBaseCommand): + result_window_title = "Dump Data" + comfirm_message = 'Are you sure you want to dump data ?' + comfirm_ok_title = 'Dump data' + + @property + def outputfile(self): + now = datetime.datetime.now() + filename = "dump-{0}-{1:%Y%m%d%H%M%S}.gz".format(self.index, now) + return os.path.join(self.fixture_dir, filename) + + def is_enabled(self): + return os.path.isdir(self.fixture_dir) + + def run(self, scroll='5m'): + if not self.is_comfirmed(): + return + + self.request(dumpdata, self.outputfile, + self.esclient, self.index, scroll=scroll) + + +class LoaddataCommand(HelperBaseCommand): + result_window_title = "Load Data" + comfirm_message = 'Are you sure you want to load data ?' + comfirm_ok_title = 'Load data' + + def is_enabled(self): + return os.path.isdir(self.fixture_dir) + + @property + def select_items(self): + items = glob.glob(os.path.join(self.fixture_dir, 'dump-*.gz')) + items.sort(reverse=True) + return items + + def show_select_items(self, callback): + self.window.show_quick_panel(self.select_items, callback) + + def run(self): + self.show_select_items(self.on_done) + + def on_done(self, index): + if index == -1: + return + + if not self.is_comfirmed(): + return + + inputfile = self.select_items[index] + self.request(loaddata, inputfile, self.esclient, self.index)