Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New funcs #52

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions oopgrade/oopgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
'load_access_rules_from_model_name',
'delete_record',
'load_translation',
'migrate_data_between_tables',
'check_data_integrity',
'drop_index',
'add_index',
'restore_table',
'backup_table'
]


Expand Down Expand Up @@ -789,3 +795,137 @@ def load_translation(cursor, lang, name, type, res_id, src, value):
ON CONFLICT (lang, src_md5, name, type, res_id) WHERE res_id is null DO UPDATE SET value = EXCLUDED.value
"""
cursor.execute(insert_sql, {'lang': lang, 'name': name, 'type': type, 'res_id': res_id, 'src': src, 'value': value})


def backup_table(cursor, table_name):
"""
Creates a backup of a specific table by copying its content to a temporary table.

:param cursor: Database cursor.
:param table_name: Name of the table to back up.
"""
backup_table_name = "{}_backup".format(table_name)
cursor.execute(
"CREATE TABLE {} AS TABLE {}".format(backup_table_name, table_name)
)
logger.info("Backup created for table: {}".format(table_name))


def restore_table(cursor, table_name):
"""
Restores a table from its backup.

:param cursor: Database cursor.
:param table_name: Name of the table to restore.
"""
backup_table_name = "{}_backup".format(table_name)
cursor.execute("DROP TABLE IF EXISTS {}".format(table_name))
cursor.execute("ALTER TABLE {} RENAME TO {}".format(backup_table_name, table_name))
logger.info("Table {} restored from backup".format(table_name))


def add_index(cursor, table_name, columns, unique=False):
"""
Adds an index to one or more columns in a specific table.

:param cursor: Database cursor.
:param table_name: Name of the table.
:param columns: List of columns to include in the index.
:param unique: Boolean indicating if the index should be unique.
"""
index_name = "{}_{}_idx".format(table_name, "_".join(columns))
columns_str = ", ".join(columns)
unique_str = "UNIQUE" if unique else ""
cursor.execute(
"CREATE {} INDEX {} ON {} ({})".format(unique_str, index_name, table_name, columns_str)
)
logger.info("{} index {} created on table {}".format(unique_str.capitalize(), index_name, table_name))


def drop_index(cursor, index_name):
"""
Removes a specific index.

:param cursor: Database cursor.
:param index_name: Name of the index to remove.
"""
cursor.execute(
"DROP INDEX IF EXISTS {}".format(index_name)
)
logger.info("Index {} dropped".format(index_name))


def check_data_integrity(cursor, table_name):
"""
Automatically checks for common data integrity issues in a table, such as null values in non-nullable columns
and foreign key violations.

:param cursor: Database cursor.
:param table_name: Name of the table to check.
:return: True if no issues are found, False otherwise.
"""
issues_found = False
# Check for null values in non-nullable columns
cursor.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = %s AND is_nullable = 'NO'
""", (table_name,)
)
non_nullable_columns = [row[0] for row in cursor.fetchall()]
for column in non_nullable_columns:
cursor.execute(
"SELECT COUNT(*) FROM {} WHERE {} IS NULL".format(table_name, column)
)
null_count = cursor.fetchone()[0]
if null_count > 0:
logger.warning("Integrity issue: {} null values found in non-nullable column '{}' in table {}".format(
null_count, column, table_name
))
issues_found = True

# Check for foreign key violations
cursor.execute(
"""
SELECT tc.constraint_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name
WHERE constraint_type = 'FOREIGN KEY' AND tc.table_name = %s
""", (table_name,)
)
foreign_keys = cursor.fetchall()
for fk in foreign_keys:
cursor.execute(
"SELECT COUNT(*) FROM {} t1 LEFT JOIN {} t2 ON t1.{} = t2.{} WHERE t2.{} IS NULL".format(
table_name, fk[2], fk[1], fk[3], fk[3]
)
)
fk_violations = cursor.fetchone()[0]
if fk_violations > 0:
logger.warning("Integrity issue: {} foreign key violations found for '{}' in table {}".format(
fk_violations, fk[1], table_name
))
issues_found = True

if not issues_found:
logger.info("No integrity issues found in table {}".format(table_name))
return not issues_found


def migrate_data_between_tables(cursor, source_table, destination_table, column_mapping):
"""
Migrates data from one table to another with column mapping.

:param cursor: Database cursor.
:param source_table: Source table.
:param destination_table: Destination table.
:param column_mapping: Dictionary mapping source columns to destination columns {source_column: destination_column}.
"""
columns_source = ', '.join(column_mapping.keys())
columns_dest = ', '.join(column_mapping.values())
cursor.execute(
"INSERT INTO {} ({}) SELECT {} FROM {}".format(destination_table, columns_dest, columns_source, source_table)
)
logger.info("Data migrated from {} to {}".format(source_table, destination_table))
102 changes: 102 additions & 0 deletions spec/oopgrade_specs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from expects import *
import six
if six.PY2:
from mock import Mock, call
else:
from unittest.mock import Mock, call

from my_module import backup_table, restore_table, add_index, drop_index, check_data_integrity, migrate_data_between_tables

with description('Database Utilities'):
with description('Backing up and restoring tables'):
with it('should create a backup of a table'):
cursor = Mock()
backup_table(cursor, 'test_table')
expected_sql = [
call("CREATE TABLE test_table_backup AS TABLE test_table"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with it('should restore a table from its backup'):
cursor = Mock()
restore_table(cursor, 'test_table')
expected_sql = [
call("DROP TABLE IF EXISTS test_table"),
call("ALTER TABLE test_table_backup RENAME TO test_table"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with description('Adding and dropping indexes'):
with it('should add a unique index on multiple columns'):
cursor = Mock()
add_index(cursor, 'test_table', ['column1', 'column2'], unique=True)
expected_sql = [
call("CREATE UNIQUE INDEX test_table_column1_column2_idx ON test_table (column1, column2)"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with it('should add a non-unique index on a single column'):
cursor = Mock()
add_index(cursor, 'test_table', ['column1'], unique=False)
expected_sql = [
call("CREATE INDEX test_table_column1_idx ON test_table (column1)"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with it('should drop an existing index'):
cursor = Mock()
drop_index(cursor, 'test_index')
expected_sql = [
call("DROP INDEX IF EXISTS test_index"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with description('Checking data integrity'):
with it('should detect null values in non-nullable columns'):
cursor = Mock()
cursor.fetchone.side_effect = [
['column1'], # Non-nullable columns
[5], # Null values found
]
result = check_data_integrity(cursor, 'test_table')
expect(result).to(be_false)
expected_sql = [
call("SELECT column_name FROM information_schema.columns WHERE table_name = %s AND is_nullable = 'NO'", ('test_table',)),
call("SELECT COUNT(*) FROM test_table WHERE column1 IS NULL"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with it('should detect foreign key violations'):
cursor = Mock()
cursor.fetchone.side_effect = [
[], # No null violations
['fk_column', 'test_table', 'ref_table', 'ref_column'], # Foreign key constraint
[3], # Foreign key violations found
]
result = check_data_integrity(cursor, 'test_table')
expect(result).to(be_false)
expected_sql = [
call("SELECT column_name FROM information_schema.columns WHERE table_name = %s AND is_nullable = 'NO'", ('test_table',)),
call("SELECT tc.constraint_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE constraint_type = 'FOREIGN KEY' AND tc.table_name = %s", ('test_table',)),
call("SELECT COUNT(*) FROM test_table t1 LEFT JOIN ref_table t2 ON t1.fk_column = t2.ref_column WHERE t2.ref_column IS NULL"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))

with it('should pass when there are no integrity issues'):
cursor = Mock()
cursor.fetchone.side_effect = [
[], # No null violations
[], # No foreign key constraints
]
result = check_data_integrity(cursor, 'test_table')
expect(result).to(be_true)

with description('Migrating data between tables'):
with it('should migrate data from one table to another'):
cursor = Mock()
column_mapping = {'source_col1': 'dest_col1', 'source_col2': 'dest_col2'}
migrate_data_between_tables(cursor, 'source_table', 'dest_table', column_mapping)
expected_sql = [
call("INSERT INTO dest_table (dest_col1, dest_col2) SELECT source_col1, source_col2 FROM source_table"),
]
expect(cursor.execute.call_args_list).to(contain_exactly(*expected_sql))
Loading