-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathcopy_stragglers.py
executable file
·114 lines (96 loc) · 4.11 KB
/
copy_stragglers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#! /usr/bin/env python
import argparse
from compare_collections import MismatchLogger
from faster_ordered_dict import FasterOrderedDict
import gevent
import gevent.monkey
from gevent.pool import Pool
from pymongo.read_preferences import ReadPreference
import time
import utils
log = utils.get_logger(__name__)
POOL_SIZE = 20
class Stats(object):
def __init__(self):
self.start_time = time.time()
self.processed = 0
self.not_found = 0
self.total = None
def log(self):
log.info("%d / %d processed | %d not found", stats.processed, stats.total, stats.not_found)
def copy_document_worker(query_doc, source_collection, dest_collection, stats):
"""
greenlet function that copies a document identified by the query document
there is a *very* narrow race condition where the document might be deleted from the source
between our find() and save(); that seems an acceptable risk
"""
docs = [doc for doc in source_collection.find(query_doc)]
assert len(docs) <= 1
if len(docs) == 0:
# if the document has been deleted from the source, we assume that the oplog applier
# will delete from the destination in the future
stats.not_found += 1
stats.processed += 1
else:
# we have the document, so copy it
dest_collection.save(docs[0])
stats.processed +=1
def stats_worker(stats):
"""
prints stats periodically
"""
while True:
gevent.sleep(3)
stats.log()
if __name__ == '__main__':
utils.tune_gc()
gevent.monkey.patch_socket()
parser = argparse.ArgumentParser(description='Through stdin, reads JSON documents containing _ids and shark keys for mismatching documents and re-copies those documents.')
parser.add_argument(
'--source', type=str, required=True, metavar='URL',
help='source to read from; e.g. localhost:27017/prod_maestro.emails')
parser.add_argument(
'--dest', type=str, required=True, metavar='URL',
help='destination to copy to; e.g. localhost:27017/destination_db.emails')
parser.add_argument(
'--mismatches-file', type=str, default=None, required=True, metavar='FILENAME',
help='read ids to copy from this file, which is generated by compare_collections.py')
args = parser.parse_args()
# connect to source and destination
source = utils.parse_mongo_url(args.source)
source_client = utils.mongo_connect(source['host'], source['port'],
ensure_direct=True,
max_pool_size=POOL_SIZE,
read_preference=ReadPreference.SECONDARY_PREFERRED,
document_class=FasterOrderedDict)
source_collection = source_client[source['db']][source['collection']]
if not source_client.is_mongos or source_client.is_primary:
raise Exception("source must be a mongos instance or a primary")
dest = utils.parse_mongo_url(args.dest)
dest_client = utils.mongo_connect(dest['host'], dest['port'],
max_pool_size=POOL_SIZE,
document_class=FasterOrderedDict)
dest_collection = dest_client[dest['db']][dest['collection']]
if source == dest:
raise ValueError("source and destination cannot be the same!")
# periodically print stats
stats = Stats()
stats_greenlet = gevent.spawn(stats_worker, stats)
# copy documents!
pool = Pool(POOL_SIZE)
with open(args.mismatches_file) as mismatches_file:
lines = mismatches_file.readlines() # copy everything into memory -- hopefully that isn't huge
stats.total = len(lines)
for line in lines:
query_doc = {'_id': MismatchLogger.decode_mismatch_id(line)}
pool.spawn(copy_document_worker,
query_doc=query_doc,
source_collection=source_collection,
dest_collection=dest_collection,
stats=stats)
# wait for everythng to finish
gevent.sleep()
pool.join()
stats_greenlet.kill()
stats.log()
log.info('done')