From 7e20084ddb2541910e0f80ce5e656324c765e349 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Tue, 30 Oct 2018 09:50:54 -0400 Subject: [PATCH 01/12] `ETLAlchemySource.py` uses `newTable` when it sholuld use `new_table`. See line 12 in `schema_transformer.py` for details. --- etlalchemy/ETLAlchemySource.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index fcc7195..b63bd88 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -1206,9 +1206,9 @@ def add_indexes(self, destination_database_url): .get(table_name) column_transformer = self.schema_transformer.column_transformations\ .get(table_name) - if table_transform and table_transform.newTable not in ["", None]: + if table_transform and table_transform.new_table not in ["", None]: # Update the table_name - table_name = table_transform.newTable + table_name = table_transform.new_table this_idx_count = 0 self.logger.info("Creating indexes for '" + table_name + "'...") for i in indexes: @@ -1383,9 +1383,9 @@ def add_fks(self, destination_database_url): #################################### table_transform = self.schema_transformer.table_transformations.get( table_name) - if table_transform and table_transform.newTable not in ["", None]: + if table_transform and table_transform.new_table not in ["", None]: # Update the table_name - table_name = table_transform.newTable + table_name = table_transform.new_table self.logger.info( "Adding FKs to table '{0}' (previously {1})".format( table_name, pre_transformed_table_name)) @@ -1456,10 +1456,10 @@ def add_fks(self, destination_database_url): ref_column_transformer = \ self.schema_transformer.column_transformations.get( ref_table) - if table_transform and table_transform.newTable not in [ + if table_transform and table_transform.new_table not in [ "", None]: # Update the table_name - ref_table = table_transform.newTable + ref_table = table_transform.new_table T_ref = Table(ref_table, dst_meta) ############################ # Check that referenced table From 514f11a50771df718e90116b61fece1c2e83b8d8 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Tue, 30 Oct 2018 10:00:48 -0400 Subject: [PATCH 02/12] Some databases (SQL Server) return `BIGINTEGER` types as a "decimal" (e.g., 1000.0) even though the value is actually a whole number. This causes trouble when migrating to other databases that expect a number without a decimal. For example, when migrating SQL Server -> PostgreSQL, the following error will be raised: `psycopg2.DataError: invalid input syntax for integer: "1000048.0"` --- etlalchemy/ETLAlchemySource.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index b63bd88..81c5615 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -298,7 +298,8 @@ def standardize_column_type(self, column, raw_rows): elif "NUMERIC" in base_classes\ or "FLOAT" in base_classes\ - or "DECIMAL" in base_classes: + or "DECIMAL" in base_classes\ + or "BIGINTEGER" in base_classes: #################################### # Check all cleaned_rows to determine # if column is decimal or integer From 8c2eaa4d31d7595ad04cc812afe0e4b81e535c23 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Tue, 30 Oct 2018 14:41:21 -0400 Subject: [PATCH 03/12] Fixed an issue caused by the previous commit where `BigInteger` columns would be coerced to `Integer`. --- etlalchemy/ETLAlchemySource.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 81c5615..bb97607 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -327,7 +327,8 @@ def standardize_column_type(self, column, raw_rows): if data is not None: null = False if data.__class__.__name__ == 'Decimal' or\ - data.__class__.__name__ == 'float': + data.__class__.__name__ == 'float' or\ + data.__class__.__name__ == 'long': splt = str(data).split(".") if len(splt) == 1: intCount += 1 From 9fe4403dc9f0499ee5cb5eb3c417b5045b2fb923 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Wed, 31 Oct 2018 11:29:47 -0400 Subject: [PATCH 04/12] Added a `per_table_buffers` kwarg (defaults to empty dict) that allows for specifying, on a per-table basis, the buffer size when fetching rows from the source databse (i.e., the number of rows to fetch at a time, per table) --- etlalchemy/ETLAlchemySource.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index bb97607..30b6c80 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -50,7 +50,8 @@ def __init__(self, skip_table_if_empty=False, skip_column_if_empty=False, compress_varchar=False, - log_file=None): + log_file=None, + per_table_buffers={}): # TODO: Store unique columns in here, and ADD the unique constraints # after data has been migrated, rather than before self.unique_columns = [] @@ -59,6 +60,9 @@ def __init__(self, self.logger = logging.getLogger("ETLAlchemySource") self.logger.propagate = False + #Allow specifying of buffer size on a per-table basis when fetching rows from the source + self.per_table_buffers = per_table_buffers + for h in list(self.logger.handlers): # Clean up any old loggers...(useful during testing w/ multiple # log_files) @@ -1007,6 +1011,9 @@ def migrate( self.logger.info("Loading all rows into memory...") rows = [] + if T_src.name in self.per_table_buffers: + buffer_size = self.per_table_buffers.get(T_src.name) + for i in range(1, (cnt / buffer_size) + 1): self.logger.info( "Fetched {0} rows".format(str(i * buffer_size))) From a26e184d48e8115d4d492fd6bbfd317fb75472a5 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 08:26:26 -0500 Subject: [PATCH 05/12] Reversed the changes to ETLAlchemySource.py made by a previous commit (more on that later). Fixed an issue with schema_transformer.py that causes tables to not be renamed per the table_schema_transformation_file. --- etlalchemy/ETLAlchemySource.py | 6 ++---- etlalchemy/schema_transformer.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 30b6c80..ef2e32d 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -302,8 +302,7 @@ def standardize_column_type(self, column, raw_rows): elif "NUMERIC" in base_classes\ or "FLOAT" in base_classes\ - or "DECIMAL" in base_classes\ - or "BIGINTEGER" in base_classes: + or "DECIMAL" in base_classes: #################################### # Check all cleaned_rows to determine # if column is decimal or integer @@ -331,8 +330,7 @@ def standardize_column_type(self, column, raw_rows): if data is not None: null = False if data.__class__.__name__ == 'Decimal' or\ - data.__class__.__name__ == 'float' or\ - data.__class__.__name__ == 'long': + data.__class__.__name__ == 'float': splt = str(data).split(".") if len(splt) == 1: intCount += 1 diff --git a/etlalchemy/schema_transformer.py b/etlalchemy/schema_transformer.py index 73f6b00..d029c84 100644 --- a/etlalchemy/schema_transformer.py +++ b/etlalchemy/schema_transformer.py @@ -81,7 +81,7 @@ def __init__(self, column_transform_file, # Returns False if deleted... def transform_table(self, table): - thisTableTT = self.table_transformations.get(table.name.lower()) + thisTableTT = self.table_transformations.get(table.name) # Update table name if thisTableTT: if thisTableTT.delete: From 1eafcb4140f46bfd3c9c0c5f2f86818d1eb3b1eb Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 09:10:15 -0500 Subject: [PATCH 06/12] Added support for PostgreSQL UUID types --- etlalchemy/literal_value_generator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/etlalchemy/literal_value_generator.py b/etlalchemy/literal_value_generator.py index 91bb68a..a507419 100644 --- a/etlalchemy/literal_value_generator.py +++ b/etlalchemy/literal_value_generator.py @@ -1,6 +1,7 @@ import shutil import decimal import datetime +import uuid # Find the best implementation available on this platform try: from cStringIO import StringIO @@ -19,6 +20,10 @@ def _generate_literal_value_for_csv(value, dialect): return "\"%s\"" % value elif value is None: return "NULL" + elif isinstance(value, bytearray): + u = uuid.UUID(bytes_le=str(value)) + print(u) + return str(u) elif isinstance(value, bool): return "%s" % int(value) elif isinstance(value, (float, int, long)): From 5a0437c6ea765fb0b73956a7d58ce58cc7d2643e Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 09:56:31 -0500 Subject: [PATCH 07/12] Removed debugging print statement --- etlalchemy/literal_value_generator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/etlalchemy/literal_value_generator.py b/etlalchemy/literal_value_generator.py index a507419..36c4740 100644 --- a/etlalchemy/literal_value_generator.py +++ b/etlalchemy/literal_value_generator.py @@ -21,9 +21,7 @@ def _generate_literal_value_for_csv(value, dialect): elif value is None: return "NULL" elif isinstance(value, bytearray): - u = uuid.UUID(bytes_le=str(value)) - print(u) - return str(u) + return str(uuid.UUID(bytes_le=str(value))) elif isinstance(value, bool): return "%s" % int(value) elif isinstance(value, (float, int, long)): From ee7d45ee3108b59236c7bf9fb36cefc266012421 Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 12:22:57 -0500 Subject: [PATCH 08/12] We can now handle UNIQUEIDENTIFIER types (MSSQL -> PostgreSQL) correctly. --- etlalchemy/ETLAlchemySource.py | 7 ++++++- etlalchemy/literal_value_generator.py | 4 +--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index ef2e32d..af9c72a 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -21,7 +21,7 @@ from sqlalchemy.inspection import inspect from sqlalchemy.exc import NoSuchTableError from sqlalchemy.types import Text, Numeric, BigInteger, Integer, DateTime, Date, TIMESTAMP, String, BINARY, LargeBinary -from sqlalchemy.dialects.postgresql import BYTEA +from sqlalchemy.dialects.postgresql import BYTEA, UUID import inspect as ins import re import csv @@ -405,6 +405,11 @@ def standardize_column_type(self, column, raw_rows): "coercing to Boolean'") column_copy.type.__class__ = sqlalchemy.types.Boolean elif "TYPEENGINE" in base_classes: + if self.dst_engine.dialect.name.lower() == "postgresql"\ + and column.type.__class__.__name__ == "UNIQUEIDENTIFIER": + column_copy.type = UUID() + self.logger.warning("Found column of type 'UNIQUEIDENTIFIER' -> " + + "coercing to 'UUID'") for r in raw_rows: if r[idx] is not None: null = False diff --git a/etlalchemy/literal_value_generator.py b/etlalchemy/literal_value_generator.py index 36c4740..bb8fb35 100644 --- a/etlalchemy/literal_value_generator.py +++ b/etlalchemy/literal_value_generator.py @@ -1,7 +1,7 @@ import shutil import decimal import datetime -import uuid + # Find the best implementation available on this platform try: from cStringIO import StringIO @@ -20,8 +20,6 @@ def _generate_literal_value_for_csv(value, dialect): return "\"%s\"" % value elif value is None: return "NULL" - elif isinstance(value, bytearray): - return str(uuid.UUID(bytes_le=str(value))) elif isinstance(value, bool): return "%s" % int(value) elif isinstance(value, (float, int, long)): From cfd0f57fcb8019d47923145dcec1d154c55c07bd Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 12:47:36 -0500 Subject: [PATCH 09/12] In the event of a varchar(max), we try to create a varchar column with the maximum allowable size, depending on our target database. --- etlalchemy/ETLAlchemySource.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index af9c72a..544b20f 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -209,6 +209,13 @@ def standardize_column_type(self, column, raw_rows): # Get the VARCHAR size of the column... ######################################## varchar_length = column.type.length + if varchar_length == 'max': + if self.dst_engine.dialect.name.lower() == "postgresql": + varchar_length = 10485760 + elif self.dst_engine.dialect.name.lower() == "mssql": + # Note: This isn't always the case for mssql! + # If using utf8, the limit is 21844. + varchar_length = 65532 ################################## # Strip collation here ... ################################## From 63c0f52dc854cda9c1eab933e57b245ca183ac5a Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 14:01:20 -0500 Subject: [PATCH 10/12] If varchar_length exceeds the maximum size for our target database, then convert VARCHAR -> TEXT --- etlalchemy/ETLAlchemySource.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 544b20f..49367b5 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -209,13 +209,16 @@ def standardize_column_type(self, column, raw_rows): # Get the VARCHAR size of the column... ######################################## varchar_length = column.type.length - if varchar_length == 'max': - if self.dst_engine.dialect.name.lower() == "postgresql": - varchar_length = 10485760 - elif self.dst_engine.dialect.name.lower() == "mssql": + # If varchar_length exceeds the maximum size for our target + # database, then convert VARCHAR -> TEXT + if self.dst_engine.dialect.name.lower() == "postgresql": + if varchar_length == 'max' or varchar_length > 10485760: + varchar_length = 0 + elif self.dst_engine.dialect.name.lower() == "mssql": + if varchar_length == 'max' or varchar_length > 65532: # Note: This isn't always the case for mssql! - # If using utf8, the limit is 21844. - varchar_length = 65532 + # If using utf8, the limit is 21844. + varchar_length = 0 ################################## # Strip collation here ... ################################## From 5b0b408451074e039b083a958fbb42933511a94a Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 5 Nov 2018 16:23:02 -0500 Subject: [PATCH 11/12] Updated the required version of SQLAlchemy-Utils to the latest version. The previous required version causes this error: ValueError: invalid literal for int() with base 10: '5 (Ubuntu 10' when used with PostgreSQL >= 10 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 449ef46..386011c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ py==1.4.31 six==1.9.0 SQLAlchemy==1.0.13 sqlalchemy-migrate==0.9.7 -SQLAlchemy-Utils==0.30.9 +SQLAlchemy-Utils==0.33.6 From 161647c977d10830d83b9ed070e2f0c2176d337f Mon Sep 17 00:00:00 2001 From: Charles Hamilton Date: Mon, 22 Apr 2019 08:18:58 -0400 Subject: [PATCH 12/12] Added some things to the TODO list. --- etlalchemy/ETLAlchemySource.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/etlalchemy/ETLAlchemySource.py b/etlalchemy/ETLAlchemySource.py index 49367b5..4db6eef 100644 --- a/etlalchemy/ETLAlchemySource.py +++ b/etlalchemy/ETLAlchemySource.py @@ -263,8 +263,20 @@ def standardize_column_type(self, column, raw_rows): # Get the VARCHAR size of the column... ######################################## varchar_length = column.type.length - column_copy.type = String() - column_copy.type.length = varchar_length + if varchar_length == 'max': + varchar_length = 0 + column_copy.type = Text() + elif self.dst_engine.dialect.name.lower() == "postgresql" and varchar_length > 10485760: + varchar_length = 0 + column_copy.type = Text() + elif self.dst_engine.dialect.name.lower() == "mssql" and varchar_length > 65532: + # Note: This isn't always the case for mssql! + # If using utf8, the limit is 21844. + varchar_length = 0 + column_copy.type = Text() + else: + column_copy.type = String() + column_copy.type.length = varchar_length ################################## # Strip collation here ... ################################## @@ -341,6 +353,7 @@ def standardize_column_type(self, column, raw_rows): null = False if data.__class__.__name__ == 'Decimal' or\ data.__class__.__name__ == 'float': + continue # TODO. chamilton 22 April 2019: Skip this part entirely. Not ready to modify/remove/etc. just yet. splt = str(data).split(".") if len(splt) == 1: intCount += 1 @@ -394,7 +407,7 @@ def standardize_column_type(self, column, raw_rows): column.name + "' is of type 'Decimal', but contains no mantissas " + "> 0. (i.e. 3.00, 2.00, etc..)\n ") - if maxDigit > 4294967295: + if maxDigit > 4294967295: # TODO. chamilton 22 April 2019: Not sure if this is necessary. self.logger.warning("Coercing to 'BigInteger'") column_copy.type = BigInteger() # Do conversion...