-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
248 lines (222 loc) · 8.46 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from inspect import currentframe, getframeinfo
import traceback
import utils
class Database:
##################################################################
# Init
##################################################################
def __init__(self, dryrun=False):
try:
import config
cfg = config.Config()
db_type = cfg.get('Type', 'DB')
if db_type == "redis":
import redis
self.__host = cfg.get("HOST", sec="Redis", default="localhost")
self.__port = cfg.get("PORT", sec="Redis", default="6379")
self.__db = cfg.get("DATABASE", sec="Redis", default="0")
self.__dryrun = dryrun
self.__rc = redis.StrictRedis(host=self.__host, port=self.__port, db=self.__db)
elif db_type == "rediscluster":
import json
from rediscluster import StrictRedisCluster
nodes = json.loads(cfg.get("NODES", "RedisCluster"))
self.__rc = StrictRedisCluster(startup_nodes=nodes, decode_responses=True, readonly_mode=readonly)
except Exception as e:
raise Exception("Error initializing Redis: %s" % (str(e)))
##################################################################
# Memory used
##################################################################
def memused(self):
assert self.__rc, "Failed to get DB memused: DB not setup"
try:
return self.__rc.info()['used_memory']
except Exception as e:
traceback.print_exc()
raise Exception("Error dumping memory info: %s" % (str(e)))
##################################################################
# Database size
##################################################################
def size(self):
assert self.__rc, "Failed to get DB size: DB not setup"
try:
return self.__rc.dbsize()
except Exception as e:
traceback.print_exc()
raise Exception("Error dumping index size: %s" % (str(e)))
##################################################################
# Batching
##################################################################
# iterate a list in batches of size n
# source: https://stackoverflow.com/questions/22255589/get-all-keys-in-redis-database-with-python
def __batcher(self, iterable, n):
try:
# Python 3
from itertools import zip_longest
except ImportError:
# Python 2
from itertools import izip_longest as zip_longest
args = [iter(iterable)] * n
return zip_longest(*args)
# in batches of 500
def __get_matching_batches(self, key_pattern, batch_size=500):
return self.__batcher(self.__rc.scan_iter(key_pattern), batch_size)
##################################################################
# convert None to ''
##################################################################
def convert_none_to_empty(self, data_dict):
converted_data = {}
for k, v in data_dict.items():
if v == None:
v = ''
else:
v = str(v)
converted_data[k] = v
return converted_data
##################################################################
# decode from bytes to str
##################################################################
def decode_redis(self, src):
if isinstance(src, list) or isinstance(src, set):
return [ item.decode('utf-8') for item in src ]
if isinstance(src, dict):
return { k.decode('utf-8'): v.decode('utf-8') for k,v in src.items() }
return src
##################################################################
# Add GitHub user data
##################################################################
def add_actor(self, actor_login, actor_type, data_dict):
try:
assert self.__rc, "Failed to add actor %s: DB not setup" % (actor_type)
if self.__dryrun:
return
key = actor_type + '%' + actor_login
converted_data = self.convert_none_to_empty(data_dict)
self.__rc.hmset(key, converted_data)
except Exception as e:
print('ERROR: ' + str(e))
traceback.print_exc()
frameinfo = getframeinfo(currentframe())
print(frameinfo.filename, frameinfo.lineno)
exit(1)
##################################################################
# Get repos
##################################################################
def get_all_repos(self):
try:
key_pattern = 'repo%*'
for batch in self.__get_matching_batches(key_pattern):
for idx, val in utils.for_each_item_int(list(batch)):
if not val:
continue
val = val.decode('utf-8')
if not val.startswith('repo%'):
raise Exception('Invalid repo name %s!' % (val))
yield val.replace('repo%','')
except Exception as e:
traceback.print_exc()
raise Exception("Failed to get all repos from DB: %s!" % (str(e)))
def get_repo(self, repo_fullname):
try:
val = self.__rc.hgetall('repo%' + repo_fullname)
return self.decode_redis(val)
except Exception as e:
raise Exception('Failed to get repo %s: %s' % (repo_fullname, str(e)))
def get_data(self, repo_fullname, data_type):
try:
key = repo_fullname + '%' + data_type
val = self.__rc.smembers(key)
return self.decode_redis(val)
except Exception as e:
raise Exception('Failed to get repo %s data on %s: %s' % \
(repo_fullname, data_type, str(e)))
##################################################################
# Add GitHub repo data
##################################################################
def add_repo(self, repo_fullname, data_dict):
try:
# NOTE: @repo_id is full repo name
if '/' not in repo_fullname:
# example: 'foo/bar' where @foo is the GitHub user and @bar is the repo name
raise Exception('db.add_repo accepts full repo name (e.g., foo/bar)')
assert self.__rc, "Failed to add actor %s: DB not setup" % (actor_type)
if self.__dryrun:
return
key = 'repo%' + repo_fullname
converted_data = self.convert_none_to_empty(data_dict)
self.__rc.hmset(key, converted_data)
except Exception as e:
print('full repo name: ' + str(repo_fullname))
print('ERROR: ' + str(e))
traceback.print_exc()
frameinfo = getframeinfo(currentframe())
print(frameinfo.filename, frameinfo.lineno)
exit(1)
##################################################################
# Add event data for a repo
##################################################################
def add_data(self, repo_fullname, created_at, data_type, data_dict):
try:
# NOTE: @repo_id is full repo name
if '/' not in repo_fullname:
print('repo_fullname: ' + str(repo_fullname))
# example: 'foo/bar' where @foo is the GitHub user and @bar is the repo name
raise Exception('db.add_repo accepts full repo name (e.g., foo/bar)')
assert self.__rc, "Failed to add actor %s: DB not setup" % (actor_type)
if self.__dryrun:
return
converted_data = self.convert_none_to_empty(data_dict)
if len(converted_data) == 0:
converted_data = {'':''}
key = repo_fullname + '%' + data_type + '%' + created_at
self.__rc.hmset(key, converted_data)
key = repo_fullname + '%' + data_type
self.__rc.sadd(key, created_at)
except Exception as e:
print('ERROR: ' + str(e))
traceback.print_exc()
frameinfo = getframeinfo(currentframe())
print(frameinfo.filename, frameinfo.lineno)
exit(1)
##################################################################
# Add Member
##################################################################
def add_member(self, full_repo_name, m_dict):
try:
# NOTE: @repo_id is full repo name
if '/' not in repo_fullname:
print('repo_fullname: ' + str(repo_fullname))
# example: 'foo/bar' where @foo is the GitHub user and @bar is the repo name
raise Exception('db.add_repo accepts full repo name (e.g., foo/bar)')
assert self.__rc, "Failed to add actor %s: DB not setup" % (actor_type)
if self.__dryrun:
return
key = full_repo_name + '%'+ 'members'
self.__rc.sadd(key, str(m_dict))
except Exception as e:
print('ERROR: ' + str(e))
traceback.print_exc()
frameinfo = getframeinfo(currentframe())
print(frameinfo.filename, frameinfo.lineno)
exit(1)
##################################################################
# unit test
##################################################################
def test():
db = Database()
#size = db.size()
#print(size)
for repo_fullname in db.get_all_repos():
repo_data = db.get_repo(repo_fullname)
print(repo_data)
#for star in db.get_data(repo_fullname, 'stars'):
# print(star)
#for fork in db.get_data(repo_fullname, 'forks'):
# print(fork)
#for release in db.get_data(repo_fullname, 'releases'):
# print(release)
##################################################################
# Main
##################################################################
if __name__ == "__main__":
test()