-
Notifications
You must be signed in to change notification settings - Fork 48
/
chdb.py
282 lines (253 loc) · 10.8 KB
/
chdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import config
import utils
import MySQLdb
import contextlib
import os
import time
import warnings
# The config file to use for connecting to database replicas.
REPLICA_MY_CNF = os.getenv(
'REPLICA_MY_CNF', os.path.expanduser('~/replica.my.cnf'))
# The config file to use for connecting to our own database.
CH_MY_CNF = os.getenv('CH_MY_CNF', REPLICA_MY_CNF)
TOOLS_LABS_CH_MYSQL_HOST = 'tools.db.svc.eqiad.wmflabs'
class _RetryingConnection(object):
'''
Wraps a MySQLdb connection, handling retries as needed.
'''
MAX_RETRIES = 10
def __init__(self, connect, sleep = time.sleep):
self._connect = connect
self._sleep = sleep
self._do_connect()
def _do_connect(self):
for retry in range(self.MAX_RETRIES):
try:
self.conn = self._connect()
except MySQLdb.OperationalError:
if retry == self.MAX_RETRIES - 1:
raise
else:
self._sleep(2 ** retry)
else:
break
self.conn.ping(True) # set the reconnect flag
def execute_with_retry(self, operations, *args, **kwds):
for retry in range(self.MAX_RETRIES):
try:
with self.conn.cursor() as cursor:
return operations(cursor, *args, **kwds)
except MySQLdb.OperationalError:
if retry == self.MAX_RETRIES - 1:
raise
else:
self._sleep(2 ** retry)
self._do_connect()
else:
break
def execute_with_retry_s(self, sql, *args):
def operations(cursor, sql, *args):
cursor.execute(sql, args)
if cursor.rowcount > 0:
return cursor.fetchall()
return None
return self.execute_with_retry(operations, sql, *args)
def __getattr__(self, name):
return getattr(self.conn, name)
@contextlib.contextmanager
def ignore_warnings():
warnings.filterwarnings('ignore', category = MySQLdb.Warning)
yield
warnings.resetwarnings()
def _connect(**kwds):
return MySQLdb.connect(charset = 'utf8mb4', autocommit = True, **kwds)
def _connect_to_ch_mysql():
kwds = {'read_default_file': CH_MY_CNF}
if utils.running_in_tools_labs():
kwds['host'] = TOOLS_LABS_CH_MYSQL_HOST
return _connect(**kwds)
def _connect_to_wp_mysql(cfg):
kwds = {'read_default_file': REPLICA_MY_CNF}
if utils.running_in_tools_labs():
# Get the project database name (and ultimately the database server's
# hostname) from the name of the database we want, as per:
# https://wikitech.wikimedia.org/wiki/Help:Tool_Labs/Database#Naming_conventions
xxwiki = cfg.database.replace('_p', '')
kwds['host'] = '%s.analytics.db.svc.eqiad.wmflabs' % xxwiki
elif os.getenv('CH_LOCAL_SSH_PORT') is not None:
# Local development with SSH port forwarding.
kwds.update({
'port': int(os.getenv('CH_LOCAL_SSH_PORT')), 'host': '127.0.0.1'})
return _connect(**kwds)
def _make_tools_labs_dbname(cursor, database, lang_code):
cursor.execute("SELECT SUBSTRING_INDEX(USER(), '@', 1)")
user = cursor.fetchone()[0]
return '%s__%s_%s' % (user, database, lang_code)
def _use(cursor, database, lang_code):
cursor.execute('USE %s' % _make_tools_labs_dbname(
cursor, database, lang_code))
# Methods that connect and help introspect into our databases. They do not
# create databases or tables, so are suitable for use in the serving path
# (see https://phabricator.wikimedia.org/T216213).
def get_table_name(db, database, table):
cfg = config.get_localized_config()
return _make_tools_labs_dbname(
db.cursor(), database, cfg.lang_code) + '.' + table
def init_db(lang_code):
def connect_and_initialize():
db = _connect_to_ch_mysql()
_use(db.cursor(), 'citationhunt', lang_code)
return db
return _RetryingConnection(connect_and_initialize)
def init_scratch_db():
cfg = config.get_localized_config()
def connect_and_initialize():
db = _connect_to_ch_mysql()
_use(db.cursor(), 'scratch', cfg.lang_code)
return db
return _RetryingConnection(connect_and_initialize)
def init_stats_db():
def connect_and_initialize():
db = _connect_to_ch_mysql()
_use(db.cursor(), 'stats', 'global')
return db
return _RetryingConnection(connect_and_initialize)
def init_wp_replica_db(lang_code):
cfg = config.get_localized_config(lang_code)
def connect_and_initialize():
db = _connect_to_wp_mysql(cfg)
with db.cursor() as cursor:
cursor.execute('USE ' + cfg.database)
return db
return _RetryingConnection(connect_and_initialize)
def get_en_projectindex_database_name():
return 's52475__wpx_p'
# Methods for use in batch scripts, not the serving frontend. These set up the
# databases, help populate the scratch database and swap it with the serving
# database.
def _create_citationhunt_tables(cfg, cursor):
cursor.execute('''
CREATE TABLE IF NOT EXISTS categories (id VARCHAR(128) PRIMARY KEY,
title VARCHAR(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
INSERT IGNORE INTO categories VALUES("unassigned", "unassigned")
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS intersections (
id VARCHAR(128) PRIMARY KEY, expiration DATETIME)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (page_id INT(8) UNSIGNED
PRIMARY KEY, url VARCHAR(512), title VARCHAR(512))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles_categories (
article_id INT(8) UNSIGNED, category_id VARCHAR(128),
FOREIGN KEY(article_id) REFERENCES articles(page_id)
ON DELETE CASCADE,
FOREIGN KEY(category_id) REFERENCES categories(id)
ON DELETE CASCADE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles_intersections (
article_id INT(8) UNSIGNED, inter_id VARCHAR(128),
PRIMARY KEY(article_id, inter_id),
FOREIGN KEY(article_id) REFERENCES articles(page_id)
ON DELETE CASCADE,
FOREIGN KEY(inter_id) REFERENCES intersections(id)
ON DELETE CASCADE)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS category_article_count (
category_id VARCHAR(128), article_count INT(8) UNSIGNED,
FOREIGN KEY(category_id) REFERENCES categories(id)
ON DELETE CASCADE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS snippets (id VARCHAR(128) PRIMARY KEY,
snippet VARCHAR(%s), section VARCHAR(768), article_id INT(8)
UNSIGNED, oldest_template_date DATETIME, FOREIGN KEY(article_id)
REFERENCES articles(page_id) ON DELETE CASCADE) ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
''', (cfg.snippet_max_size * 10,))
cursor.execute('''
CREATE TABLE IF NOT EXISTS snippets_links (prev VARCHAR(128),
next VARCHAR(128), cat_id VARCHAR(128), inter_id VARCHAR(128),
FOREIGN KEY(prev) REFERENCES snippets(id) ON DELETE CASCADE,
FOREIGN KEY(next) REFERENCES snippets(id) ON DELETE CASCADE,
FOREIGN KEY(cat_id) REFERENCES categories(id) ON DELETE CASCADE,
FOREIGN KEY(inter_id) REFERENCES intersections(id) ON DELETE CASCADE)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
def _create_stats_tables(cfg, cursor):
# No FOREIGN KEY for inter_id because we want to keep the stats even when
# the intersection expires.
cursor.execute('''
CREATE TABLE IF NOT EXISTS requests (
ts DATETIME, lang_code VARCHAR(10), snippet_id VARCHAR(128),
category_id VARCHAR(128), url VARCHAR(768), prefetch BOOLEAN,
status_code INTEGER, referrer VARCHAR(128), inter_id VARCHAR(128))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS fixed (
clicked_ts DATETIME, snippet_id VARCHAR(128) UNIQUE,
lang_code VARCHAR(10), rev_id INT(8) DEFAULT -1,
inter_id VARCHAR(128))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
# Create per-language views for convenience
for lang_code in cfg.lang_codes_to_lang_names:
cursor.execute('''
CREATE OR REPLACE VIEW requests_''' + lang_code +
''' AS SELECT * FROM requests WHERE lang_code = %s
''', (lang_code,))
cursor.execute('''
CREATE OR REPLACE VIEW fixed_''' + lang_code +
''' AS SELECT * FROM fixed WHERE lang_code = %s
''', (lang_code,))
def initialize_all_databases():
def _do_create_database(cursor, database, lang_code):
dbname = _make_tools_labs_dbname(cursor, database, lang_code)
cursor.execute('SET SESSION sql_mode = ""')
cursor.execute(
'CREATE DATABASE IF NOT EXISTS %s '
'CHARACTER SET utf8mb4' % dbname)
cfg = config.get_localized_config()
db = _RetryingConnection(_connect_to_ch_mysql)
with db.cursor() as cursor, ignore_warnings():
cursor.execute('DROP DATABASE IF EXISTS ' + _make_tools_labs_dbname(
cursor, 'scratch', cfg.lang_code))
for database in ['citationhunt', 'scratch', 'stats']:
_do_create_database(cursor, database,
cfg.lang_code if database != 'stats' else 'global')
_use(cursor, 'scratch', cfg.lang_code)
_create_citationhunt_tables(cfg, cursor)
_use(cursor, 'citationhunt', cfg.lang_code)
_create_citationhunt_tables(cfg, cursor)
_use(cursor, 'stats', 'global')
_create_stats_tables(cfg, cursor)
def install_scratch_db():
cfg = config.get_localized_config()
with init_db(cfg.lang_code).cursor() as cursor:
chname = _make_tools_labs_dbname(cursor, 'citationhunt', cfg.lang_code)
scname = _make_tools_labs_dbname(cursor, 'scratch', cfg.lang_code)
# generate a sql query that will atomically swap tables in
# 'citationhunt' and 'scratch'. Modified from:
# http://blog.shlomoid.com/2010/02/emulating-missing-rename-database.html
cursor.execute('''SET group_concat_max_len = 2048;''')
cursor.execute('''
SELECT CONCAT('RENAME TABLE ',
GROUP_CONCAT('%s.', table_name,
' TO ', table_schema, '.old_', table_name, ', ',
table_schema, '.', table_name, ' TO ', '%s.', table_name),';')
FROM information_schema.TABLES WHERE table_schema = '%s'
GROUP BY table_schema;
''' % (chname, chname, scname))
rename_stmt = cursor.fetchone()[0]
cursor.execute(rename_stmt)
cursor.execute('DROP DATABASE ' + scname)