forked from miracle2k/linuxutils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbconvert.py
executable file
·161 lines (135 loc) · 6.61 KB
/
dbconvert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python
"""
Script to convert between different databases.
Written against SQLAlchemy 6.1beta.
Based on code from:
http://www.tylerlesmann.com/2009/apr/27/copying-databases-across-platforms-sqlalchemy/
TODO: Not using the ORM is likely faster, but more extensive to write;
We'd need to construct queries manually; also, even with the ORM, there
are probably some SQLAlchemy-related optimizations that could be employed
to speed up the the processing of large tables (expunge_all?).
TODO: Quite frequently, schema conversion doesn't work because SQLAlchemy is
quite strict about schemas. For example, SQLite has no LONGTEXT column, and
MySQL requires a fixed length VARCHAR. Does requirements are not automatically
bridged. Possible a way could be provided for the user to define the mapper
him/herself. Note that this basically is only a schema creation issue. You
can already workaround such a error by defining the target table manually.
"""
import optparse
import sys
import time
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
def make_session(connection_string, convert_unicode):
engine = create_engine(connection_string, echo=False,
convert_unicode=convert_unicode)
Session = sessionmaker(bind=engine)
return Session(), engine
def pull_data(from_db, to_db, options):
# Note about encodings: We use "convert_unicode" for the source
# but not the destination connection. To hope here is that the data
# read is available in unicode, and the destination backend then has the
# responsibility to properly handle the unicode data. "convert_unicode"
# seems somewhat dangerous on write, because the backend gets only a
# bytestring, and it seems like SQLAlchemy (0.6beta2) does not sync it's
# own "encoding" setting (used by convert_unicode) with the MySQLdb
# "charset" option (defaults to latin1). As a result, you have a utf8
# string that is processed by the server as latin1.
source, sengine = make_session(from_db, convert_unicode=True)
smeta = MetaData(bind=sengine)
destination, dengine = make_session(to_db, convert_unicode=False)
print 'Pulling schemas from source server'
smeta.reflect(only=options.tables)
for name, table in smeta.tables.iteritems():
print 'Processing table "%s"' % name
if options.create_tables:
print '...Creating table on destination server'
table.metadata.create_all(dengine)
NewRecord = quick_mapper(table)
columns = table.columns.keys()
num_records = source.query(table).count()
i = 0
start = time.time()
# Note that yield only affects the number of ORM objects generated
# by SA; The stupid MySQLdb backend still fetches all rows at once.
# Try OurSQL. References for this:
# * http://www.mail-archive.com/[email protected]/msg17389.html)
# * http://stackoverflow.com/questions/2145177/is-this-a-memory-leak-a-program-in-python-with-sqlalchemy-sqlite
for record in source.query(table).yield_per(getattr(options, 'yield')):
data = dict(
[(str(column), getattr(record, column)) for column in columns]
)
if options.merge:
# TODO: Can be use load=False here? And should we?
destination.merge(NewRecord(**data))
else:
destination.add(NewRecord(**data))
i += 1
if (options.flush and i % options.flush == 0):
destination.flush()
if (options.commit and i % options.commit == 0):
destination.commit()
now = time.time()
done = i/float(num_records)
sys.stderr.write('...Transferring record %d/%d (%d%%), %ds elapsed, %ds estimated\r' % (
i, num_records, done*100, now-start, (now-start)/done))
sys.stderr.flush()
sys.stderr.write("\n");
print '...Transferred %d records in %f seconds' % (i, time.time() - start)
print '...Committing changes'
destination.commit()
def get_usage():
return """usage: %prog [options] FROM TO
FROM/TO syntax: driver://user[:password]@host[:port]/database)
Example: mysql://root@db2:3307/reporting"""
def quick_mapper(table):
Base = declarative_base()
class GenericMapper(Base):
__table__ = table
return GenericMapper
def main():
parser = optparse.OptionParser(usage=get_usage())
parser.add_option('-t', '--table', dest="tables", action="append",
help="comma only this table (can be given multiple times)",
metavar="NAME")
parser.add_option('--skip-schema', dest="create_tables", default=True,
action='store_false',
help="do not create tables in the destination database")
parser.add_option('--merge', dest="merge", action='store_true',
help="merge with existing data based on primary key; "+
"use if the target table already has rows; up to "+
"15 times slower.")
parser.add_option('-y', '--yield', dest="yield", default=4000,
type="int", metavar="NUM",
help="number of source rows to pull into memory in one "+
"batch; some backends like MySQLdb still fetch "+
"everything anyway (default: %default)")
parser.add_option('-f', '--flush', dest="flush", default=10000,
type="int", metavar="NUM",
help="number of rows to cache in memory before sending "+
"queries to the destination database "+
"(default: %default)")
parser.add_option('-c', '--commit', dest="commit", default=None,
type="int", metavar="NUM",
help="number of rows after which to commit and start a "+
"new transaction; implies a flush (default: "+
"only commit when done)")
options, args = parser.parse_args(sys.argv[1:])
if len(args) < 2:
parser.print_usage()
print >>sys.stderr, "error: you need to specify FROM and TO urls"
return 1
elif len(args) > 2:
parser.print_usage()
print >>sys.stderr, "error: unexpected arguments: %s" % ", ".join(args[2:])
return 1
else:
from_url, to_url = args
pull_data(
from_url,
to_url,
options,
)
if __name__ == '__main__':
sys.exit(main() or 0)