From 8559a477f8733c0ea1ce7b3288d047d14004ccd4 Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Wed, 18 Apr 2018 16:25:24 +0200 Subject: [PATCH 01/11] Add quote around the password in the mysqlimport command --- etlalchemy/ETLAlchemySource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index fcc7195..c6728d0 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -666,7 +666,7 @@ def send_data(self, table, columns): self.logger.info( "Sending data to target MySQL instance...(Fast [mysqlimport])") columns = map(lambda c: "\`{0}\`".format(c), columns) - cmd = ("mysqlimport -v -h{0} -u{1} -p{2} " + cmd = ("mysqlimport -v -h{0} -u{1} -p'{2}' " "--compress " "--local " "--fields-terminated-by=\",\" " @@ -1080,7 +1080,7 @@ def migrate( data_file_path = os.getcwd() + "/{0}.sql".format(T.name) if os.path.isfile(data_file_path): os.remove(data_file_path) - # Delete the old file if it esists (i.e. if a previous run went + # Delete the old file if it exists (i.e. if a previous run went # bad and didn't clean up...) dst_meta.reflect(self.dst_engine) From 6971f7b5d1e3226d03fc668f77f47d65e9bdc41b Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Mon, 23 Apr 2018 12:27:21 +0200 Subject: [PATCH 02/11] Change: Start to move to two-phase import, not loading all table data into memory at once. Not very effective yet, due to opening and closing export file for every row. --- etlalchemy/ETLAlchemySource.py | 356 ++++++++++++--------------------- 1 file changed, 133 insertions(+), 223 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index c6728d0..8524fd9 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -113,6 +113,7 @@ def __init__(self, self.column_count = 0 self.table_count = 0 self.empty_table_count = 0 + self.missing_table_count = 0 self.empty_tables = [] self.deleted_table_count = 0 self.deleted_column_count = 0 @@ -159,8 +160,14 @@ def get_nearest_power_of_two(self, num): return i - 2 - def standardize_column_type(self, column, raw_rows): - old_column_class = column.type.__class__ + def standardize_column_type(self, column, distinct_column_rows): + """ + + :param column: column in question + :param distinct_column_rows: all distinct values for column + :return: a new column instance + """ + column_copy = Column(column.name, column.type, nullable=column.nullable, @@ -171,24 +178,20 @@ def standardize_column_type(self, column, raw_rows): """""""""""""""""""""""""""""""" """ *** STANDARDIZATION *** """ """""""""""""""""""""""""""""""" - idx = self.current_ordered_table_columns.index(column.name) ############################## # Duck-typing to remove # database-vendor specific column types ############################## - base_classes = map( - lambda c: c.__name__.upper(), - column.type.__class__.__bases__) - self.logger.info("({0}) {1}".format(column.name, - column.type.__class__.__name__)) + base_classes = map(lambda c: c.__name__.upper(), column.type.__class__.__bases__) + self.logger.info("({0}) {1}".format(column.name, column.type.__class__.__name__)) self.logger.info("Bases: {0}".format(str(base_classes))) # Assume the column is empty, unless told otherwise null = True if "ENUM" in base_classes: - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False # Hack for error 'postgresql enum type requires a name' if self.dst_engine.dialect.name.lower() == "postgresql": @@ -210,17 +213,12 @@ def standardize_column_type(self, column, raw_rows): ################################## column_copy.type.collation = None max_data_length = 0 - for row in raw_rows: - data = row[idx] - if data is not None: + for row in distinct_column_rows: + if row[0] is not None: null = False # Update varchar(size) - if len(data) > max_data_length: - max_data_length = len(data) - if isinstance(row[idx], unicode): - row[idx] = row[idx].encode('utf-8', 'ignore') - else: - row[idx] = row[idx].decode('utf-8', 'ignore').encode('utf-8') + if len(row[0]) > max_data_length: + max_data_length = len(row[0]) if self.compress_varchar: # Let's reduce the "n" in VARCHAR(n) to a power of 2 if max_data_length > 0: @@ -255,18 +253,13 @@ def standardize_column_type(self, column, raw_rows): # Strip collation here ... ################################## column_copy.type.collation = None - for row in raw_rows: - data = row[idx] - if varchar_length and data and len(data) > varchar_length: + for row in distinct_column_rows: + if varchar_length and row[0] and len(row[0]) > varchar_length: self.logger.critical( "Length of column '{0}' exceeds VARCHAR({1})".format( column.name, str(varchar_length))) - if data is not None: + if row[0] is not None: null = False - if isinstance(row[idx], unicode): - row[idx] = row[idx].encode('utf-8', 'ignore') - #if row[idx]: - # row[idx] = row[idx].decode('utf-8', 'ignore') elif "DATE" in base_classes or "DATETIME" in base_classes: #################################### @@ -275,16 +268,13 @@ def standardize_column_type(self, column, raw_rows): ################################### type_count = {} types = set([]) - for row in raw_rows: - data = row[ - self.current_ordered_table_columns.index( - column.name)] - types.add(data.__class__.__name__) - if type_count.get(data.__class__.__name__): - type_count[data.__class__.__name__] += 1 + for row in distinct_column_rows: + types.add(row[0].__class__.__name__) + if type_count.get(row[0].__class__.__name__): + type_count[row[0].__class__.__name__] += 1 else: - type_count[data.__class__.__name__] = 1 - if data is not None: + type_count[row[0].__class__.__name__] = 1 + if row[0] is not None: null = False self.logger.warning(str(type_count)) if type_count.get("datetime"): @@ -310,27 +300,24 @@ def standardize_column_type(self, column, raw_rows): maxDigit = 0 type_count = {} types = set([]) - for row in raw_rows: - data = row[ - self.current_ordered_table_columns.index( - column.name)] - types.add(data.__class__.__name__) - if type_count.get(data.__class__.__name__): - type_count[data.__class__.__name__] += 1 + for row in distinct_column_rows: + types.add(row[0].__class__.__name__) + if type_count.get(row[0].__class__.__name__): + type_count[row[0].__class__.__name__] += 1 else: - type_count[data.__class__.__name__] = 1 + type_count[row[0].__class__.__name__] = 1 ###################### # Check for NULL data # (We will drop column if all rows contain null) ###################### - if data is not None: + if row[0] is not None: null = False - if data.__class__.__name__ == 'Decimal' or\ - data.__class__.__name__ == 'float': - splt = str(data).split(".") + if row[0].__class__.__name__ == 'Decimal' or\ + row[0].__class__.__name__ == 'float': + splt = str(row[0]).split(".") if len(splt) == 1: intCount += 1 - maxDigit = max(data, maxDigit) + maxDigit = max(row, maxDigit) continue left_hand_digits = splt[0] @@ -347,9 +334,9 @@ def standardize_column_type(self, column, raw_rows): # Short circuit the above 'and' so we don't keep resetting mantissa_gt_zero mantissa_gt_zero = True - elif data.__class__.__name__ == 'int': + elif row.__class__.__name__ == 'int': intCount += 1 - maxDigit = max(data, maxDigit) + maxDigit = max(row, maxDigit) self.logger.info(" --> " + str(column.name) + "..." + str(type_count)) #self.logger.info("Max Digit Length: {0}".format(str(len(str(maxDigit))))) @@ -384,41 +371,41 @@ def standardize_column_type(self, column, raw_rows): self.logger.warning("Coercing to 'BigInteger'") column_copy.type = BigInteger() # Do conversion... - for r in raw_rows: - if r[idx] is not None: - r[idx] = long(r[idx]) + for r in distinct_column_rows: + if r[0] is not None: + r[0] = long(r[0]) else: column_copy.type = Integer() self.logger.warning("Coercing to 'Integer'") - for r in raw_rows: - if r[idx] is not None: - r[idx] = int(r[idx]) + for r in distinct_column_rows: + if r[0] is not None: + r[0] = int(r[0]) elif column.type.__class__.__name__ == "BIT": - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False self.logger.info("Found column of type 'BIT' -> " + "coercing to Boolean'") column_copy.type.__class__ = sqlalchemy.types.Boolean elif "TYPEENGINE" in base_classes: - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False self.logger.warning( "Type '{0}' has no base class!".format( column.type.__class__.__name__)) elif "VARBINARY" in base_classes or "LARGEBINARY" in base_classes: if self.dst_engine.dialect.name.lower() == "postgresql": - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False - r[idx] = r[idx].encode('hex') + r[0] = r[0].encode('hex') column_copy.type = LargeBinary() elif "_BINARY" in base_classes: - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False - r[idx] = r[idx].encode('hex') + r[0] = r[0].encode('hex') if self.dst_engine.dialect.name.lower() == "postgresql": column_copy.type = BYTEA() else: @@ -429,8 +416,8 @@ def standardize_column_type(self, column, raw_rows): # ... we need to check for null columns still b/c # ... we default to True ! ###################################################### - for r in raw_rows: - if r[idx] is not None: + for r in distinct_column_rows: + if r[0] is not None: null = False # Reset collations... if hasattr(column.type, 'collation'): @@ -465,25 +452,15 @@ def standardize_column_type(self, column, raw_rows): return column_copy - def add_or_eliminate_column( - self, - T, - T_dst_exists, - column, - column_copy, - raw_rows): + def add_or_eliminate_column(self, + T, + T_dst_exists, + column, + column_copy): self.logger.info("Checking column for elimination status...") - old_column_class = column.type.__class__ table_name = T.name - null = True idx = self.current_ordered_table_columns.index(column.name) - cname = column_copy.name - columnHasGloballyIgnoredSuffix = len( - filter( - lambda s: cname.find(s) > -1, - self.global_ignored_col_suffixes)) > 0 - oldColumns = self.current_ordered_table_columns oldColumnsLength = len(self.current_ordered_table_columns) ################################## @@ -579,8 +556,7 @@ def transform_data(self, T_src, raw_rows): # TC.clean(raw_rows) # Transform the schema second (by updating the column names [keys of # dict]) - self.schema_transformer.transform_rows( - raw_rows, self.original_ordered_table_columns, T_src.name) + self.schema_transformer.transform_rows(raw_rows, self.original_ordered_table_columns, T_src.name) def create_table(self, T_dst_exists, T): with self.dst_engine.connect() as conn: @@ -607,15 +583,12 @@ def send_data(self, table, columns): session = Session() data_file_path = os.getcwd() + "/" + table + ".sql" - self.logger.info( - "Transferring data from local file '{0}' to target DB".format( - table + ".sql")) + self.logger.info("Transferring data from local file '{0}' to target DB".format(table + ".sql")) if self.dst_engine.dialect.name.lower() == "mssql": username = self.dst_engine.url.username password = self.dst_engine.url.password dsn = self.dst_engine.url.host - db_name = list(self.dst_engine.execute( - "SELECT DB_NAME()").fetchall())[0][0] + db_name = list(self.dst_engine.execute("SELECT DB_NAME()").fetchall())[0][0] if not self.enable_mssql_bulk_insert: ###################################### # SQL Azure does not support BULK INSERT @@ -872,11 +845,7 @@ def dump_data(self, T_dst_exists, T, raw_rows, pks, sessionMaker): conn.close() # TODO: Have a 'Create' option for each table... - def migrate( - self, - destination_database_url, - migrate_data=True, - migrate_schema=True): + def migrate(self, destination_database_url, migrate_data=True, migrate_schema=True): """""""""""""""""""""""" """ ** REFLECTION ** """ """""""""""""""""""""""" @@ -885,15 +854,15 @@ def migrate( if self.database_url.split(":")[0] == "oracle+cx_oracle": try: - self.engine = create_engine( - self.database_url, arraysize=buffer_size) - except ImportError as e: + self.engine = create_engine(self.database_url, arraysize=buffer_size) + except ImportError: raise DBApiNotFound(self.database_url) else: try: self.engine = create_engine(self.database_url) - except ImportError as e: + except ImportError: raise DBApiNotFound(self.database_url) + # Create inspectors to gather schema info... self.src_insp = reflection.Inspector.from_engine(self.engine) self.table_names = self.src_insp.get_table_names() @@ -911,14 +880,12 @@ def migrate( TablesIterator = self.table_names # defaults to ALL tables if self.included_tables and self.excluded_tables: - raise Exception("Can't provide 'included_tables'" + - "'excluded_tables', choose 1...aborting...") + raise Exception("Can't provide both 'included_tables' and 'excluded_tables', choose 1...aborting...") if self.included_tables: TablesIterator = self.included_tables elif self.excluded_tables: - TablesIterator = list(set(TablesIterator) - - set(self.excluded_tables)) + TablesIterator = list(set(TablesIterator) - set(self.excluded_tables)) t_idx = -1 t_total = len(TablesIterator) @@ -934,7 +901,7 @@ def migrate( ####################### self.times[table_name] = {} self.table_count += 1 - self.logger.info("Reading Table Schema '" + table_name + "'...") + self.logger.info("Reading Table Schema '%s'...", table_name) pk_count = 0 auto_inc_count = 0 @@ -942,13 +909,9 @@ def migrate( T_src = Table(table_name, MetaData()) try: self.src_insp.reflecttable(T_src, None) - except NoSuchTableError as table: - self.logger.error( - "Table '" + - table + - "' not found in DB: '" + - destination + - "'.") + except NoSuchTableError: + self.logger.error("Table '%s' not found in source DB.", table_name) + self.missing_table_count += 1 continue # skip to next table... except sqlalchemy.exc.DBAPIError as e: self.logger.error(str(e)) @@ -970,13 +933,10 @@ def migrate( T_dst_exists = True try: self.tgt_insp.reflecttable(T, None) - except sqlalchemy.exc.NoSuchTableError as e: + except sqlalchemy.exc.NoSuchTableError: T_dst_exists = False - self.logger.warning( - "Table '" + - T.name + - "' does not exist in the dst database " + - "(we will create this later...)") + self.logger.warning("Table '%s' does not exist in the dst database " + + "(we will create this later...)", T.name) """""""""""""""""""""""""" """ *** EXTRACTION *** """ @@ -990,40 +950,24 @@ def migrate( for i in range(0, len(cols)): self.original_ordered_table_columns[i] = cols[i] self.current_ordered_table_columns[i] = cols[i] + ################################### # Grab raw rows for data type checking... ################################## - self.logger.info( - "Building query to fetch all rows from {0}".format( - T_src.name)) - - + self.logger.info("Count all rows from %s", T_src.name) cnt = self.engine.execute(T_src.count()).fetchone()[0] - resultProxy = self.engine.execute(T_src.select()) - self.logger.info("Done. ({0} total rows)".format(str(cnt))) - j = 0 - self.logger.info("Loading all rows into memory...") - rows = [] - - for i in range(1, (cnt / buffer_size) + 1): - self.logger.info( - "Fetched {0} rows".format(str(i * buffer_size))) - rows += resultProxy.fetchmany(buffer_size) - rows += resultProxy.fetchmany(cnt % buffer_size) - # Don't rely on Python garbage collection... - resultProxy.close() + self.logger.info("Done. (%r total rows)", cnt) - assert(cnt == len(rows)) + # + # PASS ONE - Check source types + # + self.logger.info("Analyzing source data for %s", T_src.name) - raw_rows = [list(row) for row in rows] - self.logger.info("Done") pks = [] - t_start_transform = datetime.now() - # TODO: Use column/table mappers, would need to update foreign # keys... - + for column in T_src.columns: self.column_count += 1 ############################## @@ -1033,37 +977,35 @@ def migrate( if column.primary_key: pks.append(column.name.lower()) pk_count += 1 - + if column.autoincrement: auto_inc_count += 1 ############################## # Standardize Column Type ############################## - column_copy = self.standardize_column_type(column, raw_rows) + distinct_column_rows = self.engine.execute(select([column]).distinct()) + column_copy = self.standardize_column_type(column, distinct_column_rows) """""""""""""""""""""""""""""" """ *** ELIMINATION I *** """ """""""""""""""""""""""""""""" - self.add_or_eliminate_column( - T, T_dst_exists, column, column_copy, raw_rows) + self.add_or_eliminate_column(T, T_dst_exists, column, column_copy) if self.dst_engine.dialect.name.lower() == "mysql": ####################################### # Remove auto-inc on composite PK's ####################################### - self.check_multiple_autoincrement_issue( - auto_inc_count, pk_count, T) + self.check_multiple_autoincrement_issue(auto_inc_count, pk_count, T) + if self.transform_table(T) is None: # Skip the table, it is scheduled to be deleted... continue elif len(T.columns) == 0: # TODO: Delete table from T_dst - self.logger.warning( - "Table '" + T.name + "' has all NULL columns, " + - "skipping...") + self.logger.warning("Table '%s' has all NULL columns, skipping...", T.name) self.empty_table_count += 1 self.empty_tables.append(T.name) continue - elif len(raw_rows) == 0 and self.skip_table_if_empty: + elif cnt == 0 and self.skip_table_if_empty: self.logger.warning( "Table '" + T.name + "' has 0 rows, skipping...") self.empty_table_count += 1 @@ -1074,9 +1016,19 @@ def migrate( if not tableCreationSuccess: continue + + + # + # PASS TWO - Write source data to file + # + + #from pudb import set_trace;set_trace() + """""""""""""""""""""""""""""" """" *** INSERT ROWS *** """"" """""""""""""""""""""""""""""" + self.logger.info("Writing data to file...") + data_file_path = os.getcwd() + "/{0}.sql".format(T.name) if os.path.isfile(data_file_path): os.remove(data_file_path) @@ -1088,65 +1040,24 @@ def migrate( #self.tgt_insp.reflecttable(T, None) t_start_dump = datetime.now() t_start_load = datetime.now() - - row_buffer_size = 100000 - if self.dst_engine.dialect.name.lower() == 'mssql' and \ - not self.enable_mssql_bulk_insert: - # MSSQL limits the amount of INSERTS per query - row_buffer_size = 1000 - - if migrate_data: - self.logger.info("Transforming & Dumping " + - str(len(raw_rows)) + - " total rows from table '" + - str(T.name) + - "' into '{0}'.".format(data_file_path)) - # Create buffers of "100000" rows - # TODO: Parameterize "100000" as 'buffer_size' (should be - # configurable) - insertionCount = (len(raw_rows) / row_buffer_size) + 1 - raw_row_len = len(raw_rows) - self.total_rows += raw_row_len - if len(raw_rows) > 0: - for i in range(0, insertionCount): - startRow = 0 # i * 1000 - endRow = row_buffer_size # (i+1) * 1000 - virtualStartRow = i * row_buffer_size - virtualEndRow = (i + 1) * row_buffer_size - if virtualEndRow > raw_row_len: - virtualEndRow = raw_row_len - endRow = raw_row_len - self.logger.info( - " ({0}) -- Transforming rows: ".format( - T.name) + - str(virtualStartRow) + - " -> " + - str(virtualEndRow) + - "...({0} Total)".format( - str(raw_row_len))) - self.transform_data( - T_src, raw_rows[startRow:endRow]) - self.logger.info( - " ({0}) -- Dumping rows: " - .format(T.name) + - str(virtualStartRow) + - " -> " + - str(virtualEndRow) + - " to '{1}.sql'...({0} Total)" - .format(str(raw_row_len), T.name) + - "[Table {0}/{1}]".format(str(t_idx), str(t_total))) - self.dump_data( - T_dst_exists, T, raw_rows[startRow:endRow], - pks, Session) - del raw_rows[startRow:endRow] - - ####################################################### - # Now *actually* load the data via fast-CLI utilities - ####################################################### - t_start_load = datetime.now() - # From .sql - self.send_data( - T.name, self.current_ordered_table_columns) + + if migrate_data and cnt > 0: + + self.logger.info("Transforming & Dumping %i from table '%s' into '%s'.", cnt, T.name, data_file_path) + + for row in self.engine.execute(T_src.select()): + # Todo: What does transform_data() do, and could it do it while dumping the data? + ## self.logger.info(" ({0}) -- Transforming rows: ".format(T.name) + str(virtualStartRow) + " -> " + str(virtualEndRow) + "...({0} Total)".format(str(raw_row_len))) + ## self.transform_data(T_src, raw_rows[startRow:endRow]) + self.dump_data(T_dst_exists, T, [row], pks, Session) + + + ####################################################### + # Now *actually* load the data via fast-CLI utilities + ####################################################### + t_start_load = datetime.now() + # From .sql + self.send_data(T.name, self.current_ordered_table_columns) t_stop_load = datetime.now() @@ -1157,27 +1068,24 @@ def migrate( extraction_dt = t_start_transform - t_start_extract extraction_dt_str = str( extraction_dt.seconds / 60) + "m:" + \ - str(extraction_dt.seconds % 60) + "s" + str(extraction_dt.seconds % 60) + "s" transform_dt = t_start_dump - t_start_transform transform_dt_str = str( transform_dt.seconds / 60) + "m:" + \ - str(transform_dt.seconds % 60) + "s" + str(transform_dt.seconds % 60) + "s" dump_dt = t_start_load - t_start_dump dump_dt_str = str(dump_dt.seconds / 60) + \ - "m:" + str(dump_dt.seconds % 60) + "s" + "m:" + str(dump_dt.seconds % 60) + "s" load_dt = t_stop_load - t_start_load load_dt_str = str(load_dt.seconds / 60) + \ - "m:" + str(load_dt.seconds % 60) + "s" - - self.times[table_name][ - 'Extraction Time (From Source)'] = extraction_dt_str - self.times[table_name][ - 'Transform Time (Schema)'] = transform_dt_str - self.times[table_name][ - 'Data Dump Time (To File)'] = dump_dt_str + "m:" + str(load_dt.seconds % 60) + "s" + + self.times[table_name]['Extraction Time (From Source)'] = extraction_dt_str + self.times[table_name]['Transform Time (Schema)'] = transform_dt_str + self.times[table_name]['Data Dump Time (To File)'] = dump_dt_str self.times[table_name]['Load Time (Into Target)'] = load_dt_str # End first table loop... @@ -1647,6 +1555,7 @@ def print_timings(self): Total Tables: {0} -- Empty Tables (skipped) {1} -- Deleted Tables (skipped) {15} + -- Missing Tables (skipped) {19} -- Synced Tables {2}\n ========================\n Total Columns: {3} @@ -1687,7 +1596,8 @@ def print_timings(self): str(self.deleted_table_count), str(self.deleted_column_count), str(self.total_rows), - str(self.total_rows / ((dt.seconds / 60) or 1)))) + str(self.total_rows / ((dt.seconds / 60) or 1)), + str(self.missing_table_count))) # self.logger.warning("Referential Integrity " + # "Violations: \n" + "\n".join(self.riv_arr)) self.logger.warning( From f67b195e3b6d23bb1f8a77ac6f411b3992fc3583 Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Tue, 24 Apr 2018 11:11:55 +0200 Subject: [PATCH 03/11] Remove dependency on the py package. Remove dependency on specific old versions of SQLAlchemy and six. Updated the version number to 1.0.7. --- requirements.txt | 9 ++++----- setup.py | 13 ++++++------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/requirements.txt b/requirements.txt index 449ef46..3aca91e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,8 +7,7 @@ #psycopg2==2.6.1 #pyodbc==3.0.10 -py==1.4.31 -six==1.9.0 -SQLAlchemy==1.0.13 -sqlalchemy-migrate==0.9.7 -SQLAlchemy-Utils==0.30.9 +six +SQLAlchemy +sqlalchemy-migrate +SQLAlchemy-Utils diff --git a/setup.py b/setup.py index 2593357..18ca251 100644 --- a/setup.py +++ b/setup.py @@ -2,19 +2,18 @@ setup( name = 'etlalchemy', packages = ['etlalchemy'], - version = '1.0.6', + version = '1.0.7', description = 'Extract, Transform, Load. Migrate any SQL Database in 4 lines of code', author = 'Sean Harrington', author_email='seanharr11@gmail.com', url='https://github.com/seanharr11/etlalchemy', - download_url='https://github.com/seanharr11/etlalchemy/tarball/1.0.6', + download_url='https://github.com/seanharr11/etlalchemy/tarball/1.0.7', keywords=['sql','migration','etl','database'], install_requires = [ - "py == 1.4.31", - "six == 1.9.0", - "SQLAlchemy == 1.0.13", - "sqlalchemy-migrate == 0.9.7", - "SQLAlchemy-Utils == 0.30.9" + "six", + "SQLAlchemy", + "sqlalchemy-migrate", + "SQLAlchemy-Utils" ], classifiers=[], ) From 4d3ff9098e0c5d946eea68c37167cdab9f0a7fff Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Tue, 24 Apr 2018 11:17:13 +0200 Subject: [PATCH 04/11] Moved comment into doc string. --- etlalchemy/literal_value_generator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/etlalchemy/literal_value_generator.py b/etlalchemy/literal_value_generator.py index 91bb68a..9baf11c 100644 --- a/etlalchemy/literal_value_generator.py +++ b/etlalchemy/literal_value_generator.py @@ -162,9 +162,10 @@ def dump_to_oracle_insert_statements(fp, engine, table, raw_rows, columns): fp.write(''.join(lines)) -# Supported by [MySQL, Postgresql, sqlite, SQL server (non-Azure) ] + def dump_to_csv(fp, table_name, columns, raw_rows, dialect): - lines = [] + """ Supported by [MySQL, Postgresql, sqlite, SQL server (non-Azure) ] """ + separator = "," # Determine the separator based on Target DB if dialect.name.lower() in ["sqlite"]: From 7e489f816f7b859d6b281142808bafd1ebac4036 Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Tue, 24 Apr 2018 11:18:47 +0200 Subject: [PATCH 05/11] Change: Added quote around user name parameter for mysqlimport. --- etlalchemy/ETLAlchemySource.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 8524fd9..f9e7a3e 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -636,10 +636,9 @@ def send_data(self, table, columns): password = self.dst_engine.url.password db_name = self.dst_engine.url.database host = self.dst_engine.url.host - self.logger.info( - "Sending data to target MySQL instance...(Fast [mysqlimport])") + self.logger.info("Sending data to target MySQL instance...(Fast [mysqlimport])") columns = map(lambda c: "\`{0}\`".format(c), columns) - cmd = ("mysqlimport -v -h{0} -u{1} -p'{2}' " + cmd = ("mysqlimport -v -h{0} -u'{1}' -p'{2}' " "--compress " "--local " "--fields-terminated-by=\",\" " From c7977b253b4e4e2ea5041dca30ebeaed3ac69bff Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Tue, 24 Apr 2018 11:27:18 +0200 Subject: [PATCH 06/11] Change dump_datat() to only write one row at a time, and take the file pointer as a parameter (not opening and closing the file for each row). --- etlalchemy/ETLAlchemySource.py | 112 +++++++++++++++------------------ 1 file changed, 51 insertions(+), 61 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index f9e7a3e..15285ec 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -731,52 +731,48 @@ def send_data(self, table, columns): os.remove(data_file_path) self.logger.info("Done") - """ - Dumps the data to a file called .sql in the CWD. - Depending on the DB Target, either a CSV will be generated - for optimized BULK IMPORT, or an INSERT query will be generated - if BULK INSERTING a CSV is not supported (i.e. SQL Azure) - """ - - def dump_data(self, T_dst_exists, T, raw_rows, pks, sessionMaker): - t_start_load = datetime.now() - conn = self.dst_engine.connect() - s = sessionMaker(bind=conn) - data_file_path = os.getcwd() + "/{0}.sql".format(T.name) + def dump_data(self, T_dst_exists, T, raw_rows, pks, fp): + """ + Dumps the data to a file called .sql in the CWD. + Depending on the DB Target, either a CSV will be generated + for optimized BULK IMPORT, or an INSERT query will be generated + if BULK INSERTING a CSV is not supported (i.e. SQL Azure) + """ if not T_dst_exists: # Table "T" DNE in the destination table prior to this entire # migration process. We can naively INSERT all rows in the buffer - with open(data_file_path, "a+") as fp: - if not self.enable_mssql_bulk_insert and\ - self.dst_engine.dialect.name.lower() == "mssql": - dump_to_sql_statement(T.insert().values( - map(lambda r: - dict(zip(self.current_ordered_table_columns, - r)), - raw_rows) - ), fp, self.dst_engine, T.name) - elif self.dst_engine.dialect.name.lower() == "oracle": - self.logger.warning( - "** BULK INSERT operation not supported by Oracle. " + - "Expect slow run-time.\nThis utilty should be " + - "run on the target host to descrease network " + - "latency for given this limitation...") - dump_to_oracle_insert_statements( - fp, self.dst_engine, - T.name, raw_rows, - self.current_ordered_table_columns) - else: - dump_to_csv( - fp, - T.name, - self.current_ordered_table_columns, - raw_rows, - self.dst_engine.dialect) + if not self.enable_mssql_bulk_insert and\ + self.dst_engine.dialect.name.lower() == "mssql": + dump_to_sql_statement(T.insert().values( + map(lambda r: + dict(zip(self.current_ordered_table_columns, + r)), + raw_rows) + ), fp, self.dst_engine, T.name) + elif self.dst_engine.dialect.name.lower() == "oracle": + self.logger.warning( + "** BULK INSERT operation not supported by Oracle. " + + "Expect slow run-time.\nThis utilty should be " + + "run on the target host to descrease network " + + "latency for given this limitation...") + dump_to_oracle_insert_statements( + fp, self.dst_engine, + T.name, raw_rows, + self.current_ordered_table_columns) + else: + dump_to_csv( + fp, + T.name, + self.current_ordered_table_columns, + raw_rows, + self.dst_engine.dialect) else: ######################################## # We need to upsert the data...prepare upsertDict... ######################################## + conn = self.dst_engine.connect() + upsertDict = {} self.logger.info("Gathering unique columns for upsert.") if len(pks) == 0: @@ -811,17 +807,16 @@ def dump_data(self, T_dst_exists, T, raw_rows, pks, sessionMaker): for pk in pks: uid += str(row[self.current_ordered_table_columns.index(pk)]) if upsertDict.get(uid): - with open(data_file_path, "a+") as fp: - stmt = T.update()\ - .where(and_(*tuple( - map(lambda pk: - T.columns[pk] == - row[self.current_ordered_table_columns - .index(pk)], - pks))))\ - .values(dict(zip( - self.current_ordered_table_columns, row))) - dump_to_sql_statement(stmt, fp, self.dst_engine, T.name) + stmt = T.update()\ + .where(and_(*tuple( + map(lambda pk: + T.columns[pk] == + row[self.current_ordered_table_columns + .index(pk)], + pks))))\ + .values(dict(zip( + self.current_ordered_table_columns, row))) + dump_to_sql_statement(stmt, fp, self.dst_engine, T.name) del raw_rows[r] ################################# # Insert the remaining rows... @@ -837,11 +832,11 @@ def dump_data(self, T_dst_exists, T, raw_rows, pks, sessionMaker): self.logger.info( " ({0}) -- Inserting remaining '{0}' rows." .format(str(raw_row_len))) - with open(data_file_path, "a+") as fp: - dump_to_sql_statement( - T.insert().values(raw_rows), fp, - self.dst_engine, T.name) - conn.close() + dump_to_sql_statement( + T.insert().values(raw_rows), fp, + self.dst_engine, T.name) + conn.close() + # TODO: Have a 'Create' option for each table... def migrate(self, destination_database_url, migrate_data=True, migrate_schema=True): @@ -871,7 +866,6 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr raise DBApiNotFound(destination_database_url) dst_meta = MetaData() - Session = sessionmaker(bind=self.dst_engine) dst_meta.bind = self.dst_engine self.tgt_insp = reflection.Inspector.from_engine(self.dst_engine) @@ -1015,14 +1009,9 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr if not tableCreationSuccess: continue - - # # PASS TWO - Write source data to file # - - #from pudb import set_trace;set_trace() - """""""""""""""""""""""""""""" """" *** INSERT ROWS *** """"" """""""""""""""""""""""""""""" @@ -1048,7 +1037,8 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr # Todo: What does transform_data() do, and could it do it while dumping the data? ## self.logger.info(" ({0}) -- Transforming rows: ".format(T.name) + str(virtualStartRow) + " -> " + str(virtualEndRow) + "...({0} Total)".format(str(raw_row_len))) ## self.transform_data(T_src, raw_rows[startRow:endRow]) - self.dump_data(T_dst_exists, T, [row], pks, Session) + with open(data_file_path, "a+") as fp: + self.dump_data(T_dst_exists, T, [row], pks, fp) ####################################################### From 4f9e1643ce3a81be4b5b14544eabb40cafa57bba Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Tue, 24 Apr 2018 11:30:13 +0200 Subject: [PATCH 07/11] Fix: Special handling for 'CLOB' and 'BLOB' columns during analyze phase. Previous method crashed with and SQL error in Oracle 11.2. --- etlalchemy/ETLAlchemySource.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 15285ec..f7ec472 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -976,7 +976,11 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr ############################## # Standardize Column Type ############################## - distinct_column_rows = self.engine.execute(select([column]).distinct()) + + if column.type.__class__.__name__ in ('CLOB', 'BLOB'): + distinct_column_rows = self.engine.execute(select([column]).order_by(func.length(column).desc()).limit(1)) + else: + distinct_column_rows = self.engine.execute(select([column]).distinct()) column_copy = self.standardize_column_type(column, distinct_column_rows) """""""""""""""""""""""""""""" """ *** ELIMINATION I *** """ From b33bb35256e5fac95b0049840b9c73f900c8b930 Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Mon, 30 Apr 2018 18:00:40 +0200 Subject: [PATCH 08/11] Change migration to mysql from using sql to csv file and continue to use mysqlimport with a new parameter '--replace' that will update or insert based on the primary key. --- etlalchemy/ETLAlchemySource.py | 41 +++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index f7ec472..333782a 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -578,12 +578,12 @@ def create_table(self, T_dst_exists, T): return True # We need to Upsert the data... - def send_data(self, table, columns): + def send_data(self, T_dst_exists, table_name, columns): Session = sessionmaker(bind=self.dst_engine) session = Session() - data_file_path = os.getcwd() + "/" + table + ".sql" + data_file_path = os.getcwd() + "/" + table_name + ".sql" - self.logger.info("Transferring data from local file '{0}' to target DB".format(table + ".sql")) + self.logger.info("Transferring data from local file '{0}' to target DB".format(table_name + ".sql")) if self.dst_engine.dialect.name.lower() == "mssql": username = self.dst_engine.url.username password = self.dst_engine.url.password @@ -611,7 +611,7 @@ def send_data(self, table, columns): conn.execute("""BULK INSERT {0} FROM '{1}' WITH ( fieldterminator = '|,', rowterminator = '\n' - );""".format(data_file_path, table)) + );""".format(data_file_path, table_name)) t1.commit() except sqlalchemy.exc.ProgrammingError as e: self.logger.critical(""" @@ -637,7 +637,13 @@ def send_data(self, table, columns): db_name = self.dst_engine.url.database host = self.dst_engine.url.host self.logger.info("Sending data to target MySQL instance...(Fast [mysqlimport])") - columns = map(lambda c: "\`{0}\`".format(c), columns) + # columns = map(lambda c: "\`{0}\`".format(c), columns) + + if T_dst_exists: + replace_parameter = '--replace' + else: + replace_parameter = '' + cmd = ("mysqlimport -v -h{0} -u'{1}' -p'{2}' " "--compress " "--local " @@ -646,11 +652,15 @@ def send_data(self, table, columns): "--fields-escaped-by='\\' " # "--columns={3} " "--lines-terminated-by=\"\n\" " + "{5} " "{3} {4}" - ).format(host, username, password, - #",".join(columns), db_name, - db_name, - data_file_path) + ).format(host, + username, + password, + #",".join(columns), db_name, + db_name, + data_file_path, + replace_parameter) self.logger.info(cmd) os.system(cmd) self.logger.info("Done.") @@ -677,7 +687,7 @@ def send_data(self, table, columns): cmd = """COPY {0} ({1}) FROM '{2}' WITH CSV QUOTE '''' ESCAPE '\\' """.format( - table, ",".join(columns), data_file_path, "'") + table_name, ",".join(columns), data_file_path, "'") self.logger.info( "Sending data to target Postgresql instance..." + "(Fast [COPY ... FROM ... WITH CSV]):" + @@ -690,7 +700,7 @@ def send_data(self, table, columns): quote = "\'" #escape = '/' copy_from_stmt = "COPY \"{0}\" FROM STDIN WITH CSV NULL '{1}'"\ - .format(table, null_value, quote, delimiter) + .format(table_name, null_value, quote, delimiter) cur.copy_expert(copy_from_stmt, fp_psql) #columns=tuple(map(lambda c: '"'+str(c)+'"', columns))) conn.commit() @@ -701,7 +711,7 @@ def send_data(self, table, columns): db_name = self.dst_engine.url.database self.logger.info( "Sending data to target sqlite instance...(Fast [.import])") - sqlite_cmd = ".separator \'|\'\\n.nullvalue NULL\\n.import {0} {1}".format(data_file_path, table) + sqlite_cmd = ".separator \'|\'\\n.nullvalue NULL\\n.import {0} {1}".format(data_file_path, table_name) self.logger.info(sqlite_cmd) os.system("echo \"{0}\" | sqlite3 {1}" .format(sqlite_cmd, db_name)) @@ -739,7 +749,7 @@ def dump_data(self, T_dst_exists, T, raw_rows, pks, fp): for optimized BULK IMPORT, or an INSERT query will be generated if BULK INSERTING a CSV is not supported (i.e. SQL Azure) """ - if not T_dst_exists: + if not T_dst_exists or self.dst_engine.dialect.name.lower() == "mysql": # Table "T" DNE in the destination table prior to this entire # migration process. We can naively INSERT all rows in the buffer if not self.enable_mssql_bulk_insert and\ @@ -1035,7 +1045,8 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr if migrate_data and cnt > 0: - self.logger.info("Transforming & Dumping %i from table '%s' into '%s'.", cnt, T.name, data_file_path) + self.logger.info("Transforming & Dumping %i rows from table '%s' into '%s'.", + cnt, T.name, data_file_path) for row in self.engine.execute(T_src.select()): # Todo: What does transform_data() do, and could it do it while dumping the data? @@ -1050,7 +1061,7 @@ def migrate(self, destination_database_url, migrate_data=True, migrate_schema=Tr ####################################################### t_start_load = datetime.now() # From .sql - self.send_data(T.name, self.current_ordered_table_columns) + self.send_data(T_dst_exists, T.name, self.current_ordered_table_columns) t_stop_load = datetime.now() From 95529218a45c637a161c052132fbd4d4e631f58b Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Wed, 2 May 2018 14:44:36 +0200 Subject: [PATCH 09/11] Fix rename of forgotten variable. --- etlalchemy/ETLAlchemySource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 333782a..d97c8cb 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -737,7 +737,7 @@ def send_data(self, T_dst_exists, table_name, columns): else: raise Exception("Not Implemented!") # Cleanup... - self.logger.info("Cleaning up '{0}'.sql".format(table)) + self.logger.info("Cleaning up '{0}'.sql".format(table_name)) os.remove(data_file_path) self.logger.info("Done") From 2f12d5a0ad102d742a3643caadf0a0b6ec8a46d3 Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Wed, 2 May 2018 14:51:11 +0200 Subject: [PATCH 10/11] Fix: Reverted old code that specifies what order the data in the csv file is in during mysqlimport. --- etlalchemy/ETLAlchemySource.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index d97c8cb..03638ca 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -637,7 +637,7 @@ def send_data(self, T_dst_exists, table_name, columns): db_name = self.dst_engine.url.database host = self.dst_engine.url.host self.logger.info("Sending data to target MySQL instance...(Fast [mysqlimport])") - # columns = map(lambda c: "\`{0}\`".format(c), columns) + columns = map(lambda c: "\`{0}\`".format(c), columns) if T_dst_exists: replace_parameter = '--replace' @@ -650,14 +650,14 @@ def send_data(self, T_dst_exists, table_name, columns): "--fields-terminated-by=\",\" " "--fields-enclosed-by='\"' " "--fields-escaped-by='\\' " - # "--columns={3} " + "--columns={3} " "--lines-terminated-by=\"\n\" " - "{5} " - "{3} {4}" + "{6} " + "{4} {5}" ).format(host, username, password, - #",".join(columns), db_name, + ",".join(columns), db_name, data_file_path, replace_parameter) From 950521a88232e51501474b1e7e462d256cb373ed Mon Sep 17 00:00:00 2001 From: Jostein Leira Date: Thu, 3 May 2018 14:36:44 +0200 Subject: [PATCH 11/11] Fix: Added escaping of backslash character during creation of CSV file for MySQL.. --- etlalchemy/literal_value_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etlalchemy/literal_value_generator.py b/etlalchemy/literal_value_generator.py index 9baf11c..b0e8fbb 100644 --- a/etlalchemy/literal_value_generator.py +++ b/etlalchemy/literal_value_generator.py @@ -15,7 +15,7 @@ def _generate_literal_value_for_csv(value, dialect): # No support for 'quote' enclosed strings return "%s" % value else: - value = value.replace('"', '""') + value = value.replace('"', '""').replace('\\','\\\\') return "\"%s\"" % value elif value is None: return "NULL"