Skip to content

Commit

Permalink
Added Index Management Commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunihiko Kido committed May 11, 2015
1 parent 4edf6e3 commit 72ddffd
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 41 deletions.
3 changes: 3 additions & 0 deletions Default.sublime-commands
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@
{"command": "show_active_server", "caption": "Elasticsearch: Show Active Server"},

{"command": "reindex", "caption": "Elasticsearch: Reindex"},
{"command": "dumpdata", "caption": "Elasticsearch: Dump Data"},
{"command": "loaddata", "caption": "Elasticsearch: Load Data"},
{"command": "copy_index", "caption": "Elasticsearch: Copy Data from ..."},
]
5 changes: 1 addition & 4 deletions Elasticsearch.sublime-settings
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"base_url": "http://localhost:9200/",
"index": "test",
"doc_type": "test",
"analyzer": "default",
"hettp_headers": {}
}
},
// Path to the Apache Bench (ab) command. If ab is on you path, you should not need to
Expand All @@ -19,6 +17,5 @@
"pretty_command": "pretty_json",
"pretty_syntax": "Elasticsearch",

"ask_to_search_types": false,
"backup_location": ""
"fixture_dir": ""
}
2 changes: 1 addition & 1 deletion Lib/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def create(self, index, doc_type, body, id=None, params=None, command=None):
return self.index(index, doc_type, body, id, params=params, command=command)

def index(self, index, doc_type, body, id=None, params=None, command=None):
method = 'POST' if id is not None else 'PUT'
method = 'PUT' if id else 'POST'
result = self.request(
method, make_path(index, doc_type, id), body=body, params=params)
return show_result_json(result.json(), command=command)
Expand Down
116 changes: 105 additions & 11 deletions Lib/elasticsearch/helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,64 @@
import gzip
import json
import sublime
from itertools import islice
from operator import methodcaller

from .utils import show_result_json
from .utils import make_url


def change_index(hits, index):
for hit in hits:
hit['_index'] = index
yield hit


def expand_action(data):
data = data.copy()
action = {'index': {}}
for key in ('_index', '_parent', '_percolate', '_routing', '_timestamp',
'_ttl', '_type', '_version', '_version_type', '_id',
'_retry_on_conflict'):
if key in data:
action['index'][key] = data.pop(key)

return action, data.get('_source', data)


def bulk_index(client, docs, chunk_size=500, **kwargs):
success, failed = 0, 0
errors = []
actions = map(expand_action, docs)

while True:
chunk = islice(actions, chunk_size)

bulk_actions = []
for action, data in chunk:
bulk_actions.append(json.dumps(action))
if data is not None:
bulk_actions.append(json.dumps(data))

if not bulk_actions:
break

body = "\n".join(bulk_actions) + "\n"
response = client.bulk(body, **kwargs)

for op_type, item in map(methodcaller('popitem'), response['items']):
ok = 200 <= item.get('status', 500) < 300
if not ok:
errors.append(item)
failed += 1
else:
success += 1

sublime.status_message("docs: {}".format(success + failed))

return success, failed if not errors else errors


def scan(client, query=None, scroll='5m', **kwargs):

kwargs['params'] = dict(search_type='scan', scroll=scroll)
Expand Down Expand Up @@ -36,7 +91,7 @@ def scan(client, query=None, scroll='5m', **kwargs):


def reindex(client, source_index, target_index, query=None, target_client=None,
scroll='5m', scan_kwargs={}, command=None):
chunk_size=500, scroll='5m', scan_kwargs={}, command=None):

target_client = client if target_client is None else target_client

Expand All @@ -48,19 +103,58 @@ def reindex(client, source_index, target_index, query=None, target_client=None,
docs = scan(client, query=query, index=source_index,
scroll=scroll, **scan_kwargs)

success, failed = 0, 0
success, failed = bulk_index(
target_client, change_index(docs, target_index), chunk_size=chunk_size)

for doc in docs:
response = target_client.index(
index=target_index, doc_type=doc['_type'],
body=json.dumps(doc['_source']), id=doc['_id'])
result['success'] = success
result['failed'] = failed

show_result_json(result, sort_keys=True, command=command)

copyindex = reindex


def dumpdata(outputfile, client, index, query=None,
scroll='5m', scan_kwargs={}, command=None):

result = {
"_source": make_url(client.base_url, index),
"_output": outputfile
}

docs = scan(client, query=query, index=index,
scroll=scroll, **scan_kwargs)

count = 0
with gzip.open(outputfile, 'wb') as f:
for doc in docs:
del doc['_score']

data = "{}\n".format(json.dumps(doc, ensure_ascii=False))
f.write(bytes(data, 'utf-8'))

count += 1
sublime.status_message("dumps: {}".format(count))

result['docs'] = count

show_result_json(result, sort_keys=True, command=command)


def loaddata(inputfile, client, index, chunk_size=500, command=None):
result = {
"_target": make_url(client.base_url, index),
"_input": inputfile
}

if 'error' in response.keys():
failed += 1
else:
success += 1
docs = []
with gzip.open(inputfile, 'rb') as f:
for doc in f:
docs.append(json.loads(doc.decode('utf-8')))
sublime.status_message("read: {}".format(len(docs)))

sublime.status_message("{0:_>10}".format(success + failed))
success, failed = bulk_index(
client, change_index(docs, index), chunk_size=chunk_size)

result['success'] = success
result['failed'] = failed
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Setting | Description
``enabled_pretty`` | enabled pretty json. required: [PrettyJson](https://github.com/dzhibas/SublimePrettyJson)
``pretty_command`` | pretty format command. default: ``pretty_json``
``pretty_syntax`` | pretty json target syntax. default: ``Elasticsearch``
``backup_location`` | Path to the Dump Data & Load Data command.
``fixture_dir`` | Path to the Dump Data & Load Data command.


#### servers.\*
Expand Down Expand Up @@ -238,9 +238,12 @@ open the Command Palette (``Shift + Command + P``) and enter ``Elasticsearch ...
* Elasticsearch: Apache Bench
* Elasticsearch: Apache Bench for Search Template

### Commmand for Helpers
### Commmand for Index Data Management.

* Elasticsearch: Reindex
* Elasticsearch: Dump Data
* Elasticsearch: Load Data
* Elasticsearch: Copy Data from ...

### Command for Sublime User Settings

Expand Down
140 changes: 117 additions & 23 deletions helpers.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,136 @@
import os.path
import glob
import datetime
import sublime

from .base import ElasticsearchCommand

from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex
from elasticsearch.helpers import dumpdata
from elasticsearch.helpers import loaddata
from elasticsearch.helpers import copyindex


class HelperBaseCommand(ElasticsearchCommand):
selected_index = 0

def select_server(self, callback):
self.server_choices = list(self.servers.keys())
self.server_choices.sort()
self.window.show_quick_panel(
self.server_choices, self.on_done,
selected_index=self.selected_index)
show_result_on_window = False
syntax = 'Packages/JavaScript/JSON.tmLanguage'
comfirm_message = None
comfirm_ok_title = "OK"

def get_server_settings(self, index):
selected = self.server_choices[index]
return self.servers[selected]
def is_comfirmed(self):
ok = sublime.ok_cancel_dialog(
"{}\n\nTarget: {}".format(
self.comfirm_message, self.active_server),
self.comfirm_ok_title)
if ok:
return True
sublime.status_message('Canceled .')
return False


class ReindexCommand(HelperBaseCommand):
result_window_title = "Reindex"
show_result_on_window = False
syntax = 'Packages/JavaScript/JSON.tmLanguage'
comfirm_message = 'Are you sure you want to reindex ?'
comfirm_ok_title = 'Reindex'

def run(self):
self.select_server(self.on_done)
def run(self, chunk_size=1000):
if not self.is_comfirmed():
return

self.request(reindex, self.esclient, source_index=self.index,
chunk_size=chunk_size, target_index=self.index)


class CopyIndexCommand(HelperBaseCommand):
result_window_title = "Copy Index Data"
comfirm_message = 'Are you sure you want to copy data ?'
comfirm_ok_title = 'Copy'

@property
def select_items(self):
items = list(self.servers.keys())
items.sort(reverse=True)
return items

def show_select_items(self, callback):
self.window.show_quick_panel(self.select_items, callback)

def run(self, chunk_size=1000):
self.chunk_size = chunk_size
self.show_select_items(self.on_done)

def source_esclient(self, index):
s = self.servers[self.select_items[index]]
return Elasticsearch(s.get('base_url'), s.get('http_headers'))

def source_index(self, index):
s = self.servers[self.select_items[index]]
return s.get('index')

def on_done(self, index):
if index == -1:
return

self.selected_index = index
if not self.is_comfirmed():
return

source_client = self.source_esclient(index)
source_index = self.source_index(index)

self.request(copyindex, source_client, source_index,
target_client=self.esclient, target_index=self.index,
chunk_size=self.chunk_size)

server = self.get_server_settings(index)
base_url = server.get('base_url')
headers = server.get('http_headers')
index = server.get('index')
client = Elasticsearch(base_url, headers)

self.request(reindex, client, source_index=index,
target_index=index)
class DumpdataCommand(HelperBaseCommand):
result_window_title = "Dump Data"
comfirm_message = 'Are you sure you want to dump data ?'
comfirm_ok_title = 'Dump data'

@property
def outputfile(self):
now = datetime.datetime.now()
filename = "dump-{0}-{1:%Y%m%d%H%M%S}.gz".format(self.index, now)
return os.path.join(self.fixture_dir, filename)

def is_enabled(self):
return os.path.isdir(self.fixture_dir)

def run(self, scroll='5m'):
if not self.is_comfirmed():
return

self.request(dumpdata, self.outputfile,
self.esclient, self.index, scroll=scroll)


class LoaddataCommand(HelperBaseCommand):
result_window_title = "Load Data"
comfirm_message = 'Are you sure you want to load data ?'
comfirm_ok_title = 'Load data'

def is_enabled(self):
return os.path.isdir(self.fixture_dir)

@property
def select_items(self):
items = glob.glob(os.path.join(self.fixture_dir, 'dump-*.gz'))
items.sort(reverse=True)
return items

def show_select_items(self, callback):
self.window.show_quick_panel(self.select_items, callback)

def run(self):
self.show_select_items(self.on_done)

def on_done(self, index):
if index == -1:
return

if not self.is_comfirmed():
return

inputfile = self.select_items[index]
self.request(loaddata, inputfile, self.esclient, self.index)

0 comments on commit 72ddffd

Please sign in to comment.