forked from nyaadevs/nyaa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_to_es.py
executable file
·134 lines (119 loc) · 4.78 KB
/
import_to_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
"""
Bulk load torents from mysql into elasticsearch `nyaav2` index,
which is assumed to already exist.
This is a one-shot deal, so you'd either need to complement it
with a cron job or some binlog-reading thing (TODO)
"""
import sys
import json
# This should be progressbar33
import progressbar
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
from elasticsearch import helpers
from nyaa import create_app, models
from nyaa.extensions import db
app = create_app('config')
es = Elasticsearch(hosts=app.config['ES_HOSTS'], timeout=30)
ic = IndicesClient(es)
def pad_bytes(in_bytes, size):
return in_bytes + (b'\x00' * max(0, size - len(in_bytes)))
# turn into thing that elasticsearch indexes. We flatten in
# the stats (seeders/leechers) so we can order by them in es naturally.
# we _don't_ dereference uploader_id to the user's display name however,
# instead doing that at query time. I _think_ this is right because
# we don't want to reindex all the user's torrents just because they
# changed their name, and we don't really want to FTS search on the user anyway.
# Maybe it's more convenient to derefence though.
def mk_es(t, index_name):
return {
"_id": t.id,
"_type": "torrent",
"_index": index_name,
"_source": {
# we're also indexing the id as a number so you can
# order by it. seems like this is just equivalent to
# order by created_time, but oh well
"id": t.id,
"display_name": t.display_name,
"created_time": t.created_time,
# not analyzed but included so we can render magnet links
# without querying sql again.
"info_hash": pad_bytes(t.info_hash, 20).hex(),
"filesize": t.filesize,
"uploader_id": t.uploader_id,
"main_category_id": t.main_category_id,
"sub_category_id": t.sub_category_id,
"comment_count": t.comment_count,
# XXX all the bitflags are numbers
"anonymous": bool(t.anonymous),
"trusted": bool(t.trusted),
"remake": bool(t.remake),
"complete": bool(t.complete),
# TODO instead of indexing and filtering later
# could delete from es entirely. Probably won't matter
# for at least a few months.
"hidden": bool(t.hidden),
"deleted": bool(t.deleted),
"has_torrent": t.has_torrent,
# Stats
"download_count": t.stats.download_count,
"leech_count": t.stats.leech_count,
"seed_count": t.stats.seed_count,
}
}
# page through an sqlalchemy query, like the per_fetch but
# doesn't break the eager joins its doing against the stats table.
# annoying that this isn't built in somehow.
def page_query(query, limit=sys.maxsize, batch_size=10000, progress_bar=None):
start = 0
while True:
# XXX very inelegant way to do this, i'm confus
stop = min(limit, start + batch_size)
if stop == start:
break
things = query.slice(start, stop)
if not things:
break
had_things = False
for thing in things:
had_things = True
yield(thing)
if not had_things or stop == limit:
break
if progress_bar:
progress_bar.update(start)
start = min(limit, start + batch_size)
FLAVORS = [
('nyaa', models.NyaaTorrent),
('sukebei', models.SukebeiTorrent)
]
# Get binlog status from mysql
with app.app_context():
master_status = db.engine.execute('SHOW MASTER STATUS;').fetchone()
position_json = {
'log_file': master_status[0],
'log_pos': master_status[1]
}
print('Save the following in the file configured in your ES sync config JSON:')
print(json.dumps(position_json))
for flavor, torrent_class in FLAVORS:
print('Importing torrents for index', flavor, 'from', torrent_class)
bar = progressbar.ProgressBar(
maxval=torrent_class.query.count(),
widgets=[ progressbar.SimpleProgress(),
' [', progressbar.Timer(), '] ',
progressbar.Bar(),
' (', progressbar.ETA(), ') ',
])
# turn off refreshes while bulk loading
ic.put_settings(body={'index': {'refresh_interval': '-1'}}, index=flavor)
bar.start()
helpers.bulk(es, (mk_es(t, flavor) for t in page_query(torrent_class.query, progress_bar=bar)), chunk_size=10000)
bar.finish()
# Refresh the index immideately
ic.refresh(index=flavor)
print('Index refresh done.')
# restore to near-enough real time
ic.put_settings(body={'index': {'refresh_interval': '30s'}}, index=flavor)