From 3e13fd65c3266ef162f908b341da394f0bf79218 Mon Sep 17 00:00:00 2001 From: wendy Date: Fri, 7 Jun 2024 23:47:29 +0800 Subject: [PATCH] modified test_md and test_valid_md to run on unique dbs --- defog_utils/utils_db.py | 74 +++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/defog_utils/utils_db.py b/defog_utils/utils_db.py index 7bba4b0..e3398da 100644 --- a/defog_utils/utils_db.py +++ b/defog_utils/utils_db.py @@ -90,12 +90,12 @@ def clean_column_name(column_name: str) -> str: return column_name -def setup_test_db(creds: Dict[str, str]): +def setup_test_db(creds: Dict[str, str], test_db: str = TEST_DB_NAME): """ Create a test database given the credentials This is for testing custom metadata where the tables are not from defog_data/defog_data_private """ - db_name = creds.get("db_name", TEST_DB_NAME) + db_name = creds.get("db_name", test_db) try: conn = psycopg2.connect( dbname="postgres", @@ -119,6 +119,33 @@ def setup_test_db(creds: Dict[str, str]): conn.close() +def delete_test_db(creds: Dict[str, str], test_db: str = TEST_DB_NAME): + """ + Delete a test database given the credentials + This is for testing custom metadata where the tables are not from defog_data/defog_data_private + """ + db_name = creds.get("db_name", test_db) + try: + conn = psycopg2.connect( + dbname="postgres", + user=creds["user"], + password=creds["password"], + host=creds["host"], + port=creds["port"], + ) + cur = conn.cursor() + conn.autocommit = True # Disabling autocommit mode to ensure that DROP DATABASE is not executed within a transaction else it will fail + cur.execute(f"DROP DATABASE IF EXISTS {db_name}") + print(f"Database {db_name} deleted") + except Exception as e: + print(e) + finally: + if "cur" in locals() or "cur" in globals(): + cur.close() + if "conn" in locals() or "conn" in globals(): + conn.close() + + ######################################## ### Metadata Related Functions Below ### ######################################## @@ -258,30 +285,32 @@ def fix_md(md: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[Dict[str, str] def test_valid_md( - sql: str, md: dict, creds: dict, verbose: bool = False + sql: str, md: dict, creds: dict, verbose: bool = False, idx: str = "" ) -> Tuple[bool, Optional[Exception]]: """ Test a sql query with custom metadata This will perform the following steps: - 1. Delete the tables in the metadata if any of it exists + 1. Delete the test database if it exists + 2. Create the test database 2. Create the tables in the metadata (to ensure that similarly named tables from previous tests are not used) 3. Run the sql query - 4. Delete the tables + 4. Delete the test database We return the results of the query, and any errors that occur + Set idx for parallel processing to avoid conflicts """ try: + # delete the test database if it exists + delete_test_db(creds, TEST_DB_NAME + idx) + # create database if non-existent + setup_test_db(creds, TEST_DB_NAME + idx) conn = psycopg2.connect( - dbname=TEST_DB_NAME, + dbname=TEST_DB_NAME + idx, user=creds["user"], password=creds["password"], host=creds["host"], port=creds["port"], ) cur = conn.cursor() - delete_ddl = mk_delete_ddl(md) - cur.execute(delete_ddl) - if verbose: - print(delete_ddl) create_ddl = mk_create_ddl(md) cur.execute(create_ddl) if verbose: @@ -292,9 +321,6 @@ def test_valid_md( for row in results: print(row) valid, err = True, None - cur.execute(delete_ddl) - if verbose: - print(delete_ddl) except Exception as e: valid, err = False, e finally: @@ -302,30 +328,31 @@ def test_valid_md( cur.close() if "conn" in locals() or "conn" in globals(): conn.close() + delete_test_db(creds, TEST_DB_NAME + idx) return valid, err -def test_md(pruned_md: Dict, creds: Dict, verbose=False): +def test_md(pruned_md: Dict, creds: Dict, verbose: bool = False, idx: str = ""): """ Given a metadata dictionary and credentials, test if the tables can be created and deleted. Mostly to check for invalid formats/types in the metadata. + Set index for parallel processing to avoid conflicts """ try: + # delete the test database if it exists + delete_test_db(creds, TEST_DB_NAME + idx) + # create database if non-existent + setup_test_db(creds, TEST_DB_NAME + idx) conn = psycopg2.connect( - dbname=TEST_DB_NAME, + dbname=TEST_DB_NAME + idx, user=creds["user"], password=creds["password"], host=creds["host"], port=creds["port"], ) cur = conn.cursor() - delete_ddl = mk_delete_ddl(pruned_md) - cur.execute(delete_ddl) create_ddl = mk_create_ddl(pruned_md) cur.execute(create_ddl) - cur.execute(delete_ddl) - cur.close() - conn.close() return True except Exception as e: if verbose: @@ -334,10 +361,11 @@ def test_md(pruned_md: Dict, creds: Dict, verbose=False): import traceback traceback.print_exc() - if cur is not None: + if "cur" in locals() or "cur" in globals(): cur.close() - if conn is not None: + if "conn" in locals() or "conn" in globals(): conn.close() + delete_test_db(creds, TEST_DB_NAME + idx) return False @@ -494,6 +522,7 @@ def generate_aliases_dict( aliases[original_table_name] = alias return aliases + def mk_alias_str(table_aliases: Dict[str, str]) -> str: """ Given a dictionary of table names to aliases, return a string of aliases in the form: @@ -505,6 +534,7 @@ def mk_alias_str(table_aliases: Dict[str, str]) -> str: aliases_str += f"-- {table_name} AS {alias}\n" return aliases_str + def generate_aliases( table_names: List, reserved_keywords: List[str] = reserved_keywords ) -> str: