-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstorage.py
135 lines (97 loc) · 4.29 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
from bson.objectid import ObjectId
from pymongo import MongoClient
_log = logging.getLogger(__name__)
class Storage(object):
def __init__(self, uri=None, db_name=None):
self._db = None
self._db_name = db_name
self._client = None
self._uri = uri
self._username = ''
self._password = ''
if self._uri is not None and self._db_name is not None:
self.connect()
assert self._db is not None
@property
def db(self):
if self._db is None:
self.connect()
assert self._db is not None
return self._db
@property
def db_name(self):
return self._db_name
@db_name.setter
def db_name(self, db_name):
self._db_name = db_name
@property
def uri(self):
return self._uri
@uri.setter
def uri(self, uri):
self._uri = uri
def connect (self):
if self.uri is None or self._db_name is None:
raise Exception("Storage hasn't been configured")
_log.info("Connecting to '{}'".format(self.uri))
self._client = MongoClient(self._uri)
assert self._client is not None
self._db = self._client[self._db_name]
assert self._db is not None
def authenticate (self, username, passwd):
self._db.authenticate (username, passwd)
def insert(self, collection, documents):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
if type(documents) == list:
if len(documents) > 0:
return self._db[collection].insert_many(documents)
else:
_log.info("Inserting documents into '{}'".format(collection))
return self._db[collection].insert_one(documents)
def update(self, collection, selector, options):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Updating document in '{}'".format(collection))
return self._db[collection].update_one(selector, options, upsert=True)
def remove(self, collection, spec_or_id=None):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Removing documents from '{}'".format(collection))
return self._db[collection].delete_one(spec_or_id)
def aggregate(self, collection, stages):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Aggregating documents in '{}'".format(collection))
cursor = self._db[collection].aggregate(stages)
return [d for d in cursor['result']]
def count(self, collection, selector):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Counting documents in '{}'".format(collection))
return self._db[collection].count_documents(selector)
def find(self, collection, selector, projection=None, order=None):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Querying documents in '{}'".format(collection))
cursor = self._db[collection].find(selector, projection)
if order:
cursor = cursor.sort (order)
return [d for d in cursor]
def find_one(self, collection, selector, projection=None):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Querying single document in '{}'".format(collection))
return self._db[collection].find_one(selector,projection)
def distinct (self, collection, field, query=None):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Querying distinct {} documents in '{}'".format(field, collection))
return self._db[collection].distinct (field, query)
def create_index(self, collection, selector):
if self._db is None:
raise Exception("Not connected to storage. Did you call connect()?")
_log.info("Creating index in '{}'".format(collection))
return self._db[collection].create_index(selector)
storage = Storage()