-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelasticsearchdriver.py
116 lines (94 loc) · 3.7 KB
/
elasticsearchdriver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: UTF-8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from elasticsearch.exceptions import NotFoundError
from driver import *
class ElasticsearchDriver(BaseDriver):
_client = None
_index_name = None
_doc_type = None
_fetch_size = None
_scroll_expiry_time = None
def __init__(self, client, index_name, doc_type, \
fetch_size=DEFAULT_FETCH_SIZE, scroll_expiry_time="5m"):
self._client = client
self._index_name = index_name
self._doc_type = doc_type
self._fetch_size = fetch_size
self._scroll_expiry_time = scroll_expiry_time
def get_queryset(self):
body = {"query": {"match_all": {}}}
return ElasticsearchQuerySet(self._client, body, \
self._scroll_expiry_time, self._index_name, self._doc_type)
def get_record(self, id):
try:
data = self._client.get(
index=self._index_name,
id=id,
doc_type=self._doc_type,
)
except NotFoundError:
return None
record = data["_source"]
record["id"] = data["_id"]
record["last_modified"] = datetime.strptime(record["last_modified"], \
"%Y-%m-%d %H:%M:%S")
return record
def execute_operation(self, operation):
if operation.oper_type == Operation.OPER_INSERT:
self._client.index(
index=self._index_name,
doc_type=self._doc_type,
body={
"title": operation.data["title"],
"description": operation.data["description"],
"release_year": operation.data["release_year"],
"length": operation.data["length"],
"rating": operation.data["rating"],
"last_modified": datetime.strftime( \
operation.data["last_modified"], \
"%Y-%m-%d %H:%M:%S"),
},
id=operation.data["id"],
)
elif operation.oper_type == Operation.OPER_UPDATE:
self._client.update(
index=self._index_name,
doc_type=self._doc_type,
id=operation.data["id"],
body={
"doc": {
"title": operation.data["title"],
"description": operation.data["description"],
"release_year": operation.data["release_year"],
"length": operation.data["length"],
"rating": operation.data["rating"],
"last_modified": datetime.strftime( \
operation.data["last_modified"], \
"%Y-%m-%d %H:%M:%S"),
}
},
)
class ElasticsearchQuerySet(BaseQuerySet):
"""
Implementation of Elasticsearch Driver QuerySet logic.
"""
_scan_iter = None
def __init__(self, client, query, scroll_expiry_time, index_name, doc_type):
"""
Thanks to "scan and scroll" API python helper, our job is simple here :)
"""
self._scan_iter = scan(client, query, scroll_expiry_time, \
index=index_name, doc_type=doc_type)
def _fetch_row(self):
"""
Return the next available row, taking care to convert to a neutral
structure.
"""
data = next(self._scan_iter)
row = data["_source"]
row["id"] = data["_id"]
row["last_modified"] = datetime.strptime(row["last_modified"], \
"%Y-%m-%d %H:%M:%S")
return row