Skip to content

Commit

Permalink
modified test_md and test_valid_md to run on unique dbs
Browse files Browse the repository at this point in the history
  • Loading branch information
wendy-aw authored and wongjingping committed Jun 10, 2024
1 parent 8f798a3 commit 3e13fd6
Showing 1 changed file with 52 additions and 22 deletions.
74 changes: 52 additions & 22 deletions defog_utils/utils_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ def clean_column_name(column_name: str) -> str:
return column_name


def setup_test_db(creds: Dict[str, str]):
def setup_test_db(creds: Dict[str, str], test_db: str = TEST_DB_NAME):
"""
Create a test database given the credentials
This is for testing custom metadata where the tables are not from defog_data/defog_data_private
"""
db_name = creds.get("db_name", TEST_DB_NAME)
db_name = creds.get("db_name", test_db)
try:
conn = psycopg2.connect(
dbname="postgres",
Expand All @@ -119,6 +119,33 @@ def setup_test_db(creds: Dict[str, str]):
conn.close()


def delete_test_db(creds: Dict[str, str], test_db: str = TEST_DB_NAME):
"""
Delete a test database given the credentials
This is for testing custom metadata where the tables are not from defog_data/defog_data_private
"""
db_name = creds.get("db_name", test_db)
try:
conn = psycopg2.connect(
dbname="postgres",
user=creds["user"],
password=creds["password"],
host=creds["host"],
port=creds["port"],
)
cur = conn.cursor()
conn.autocommit = True # Disabling autocommit mode to ensure that DROP DATABASE is not executed within a transaction else it will fail
cur.execute(f"DROP DATABASE IF EXISTS {db_name}")
print(f"Database {db_name} deleted")
except Exception as e:
print(e)
finally:
if "cur" in locals() or "cur" in globals():
cur.close()
if "conn" in locals() or "conn" in globals():
conn.close()


########################################
### Metadata Related Functions Below ###
########################################
Expand Down Expand Up @@ -258,30 +285,32 @@ def fix_md(md: Dict[str, List[Dict[str, str]]]) -> Dict[str, List[Dict[str, str]


def test_valid_md(
sql: str, md: dict, creds: dict, verbose: bool = False
sql: str, md: dict, creds: dict, verbose: bool = False, idx: str = ""
) -> Tuple[bool, Optional[Exception]]:
"""
Test a sql query with custom metadata
This will perform the following steps:
1. Delete the tables in the metadata if any of it exists
1. Delete the test database if it exists
2. Create the test database
2. Create the tables in the metadata (to ensure that similarly named tables from previous tests are not used)
3. Run the sql query
4. Delete the tables
4. Delete the test database
We return the results of the query, and any errors that occur
Set idx for parallel processing to avoid conflicts
"""
try:
# delete the test database if it exists
delete_test_db(creds, TEST_DB_NAME + idx)
# create database if non-existent
setup_test_db(creds, TEST_DB_NAME + idx)
conn = psycopg2.connect(
dbname=TEST_DB_NAME,
dbname=TEST_DB_NAME + idx,
user=creds["user"],
password=creds["password"],
host=creds["host"],
port=creds["port"],
)
cur = conn.cursor()
delete_ddl = mk_delete_ddl(md)
cur.execute(delete_ddl)
if verbose:
print(delete_ddl)
create_ddl = mk_create_ddl(md)
cur.execute(create_ddl)
if verbose:
Expand All @@ -292,40 +321,38 @@ def test_valid_md(
for row in results:
print(row)
valid, err = True, None
cur.execute(delete_ddl)
if verbose:
print(delete_ddl)
except Exception as e:
valid, err = False, e
finally:
if "cur" in locals() or "cur" in globals():
cur.close()
if "conn" in locals() or "conn" in globals():
conn.close()
delete_test_db(creds, TEST_DB_NAME + idx)
return valid, err


def test_md(pruned_md: Dict, creds: Dict, verbose=False):
def test_md(pruned_md: Dict, creds: Dict, verbose: bool = False, idx: str = ""):
"""
Given a metadata dictionary and credentials, test if the tables can be created and deleted.
Mostly to check for invalid formats/types in the metadata.
Set index for parallel processing to avoid conflicts
"""
try:
# delete the test database if it exists
delete_test_db(creds, TEST_DB_NAME + idx)
# create database if non-existent
setup_test_db(creds, TEST_DB_NAME + idx)
conn = psycopg2.connect(
dbname=TEST_DB_NAME,
dbname=TEST_DB_NAME + idx,
user=creds["user"],
password=creds["password"],
host=creds["host"],
port=creds["port"],
)
cur = conn.cursor()
delete_ddl = mk_delete_ddl(pruned_md)
cur.execute(delete_ddl)
create_ddl = mk_create_ddl(pruned_md)
cur.execute(create_ddl)
cur.execute(delete_ddl)
cur.close()
conn.close()
return True
except Exception as e:
if verbose:
Expand All @@ -334,10 +361,11 @@ def test_md(pruned_md: Dict, creds: Dict, verbose=False):
import traceback

traceback.print_exc()
if cur is not None:
if "cur" in locals() or "cur" in globals():
cur.close()
if conn is not None:
if "conn" in locals() or "conn" in globals():
conn.close()
delete_test_db(creds, TEST_DB_NAME + idx)
return False


Expand Down Expand Up @@ -494,6 +522,7 @@ def generate_aliases_dict(
aliases[original_table_name] = alias
return aliases


def mk_alias_str(table_aliases: Dict[str, str]) -> str:
"""
Given a dictionary of table names to aliases, return a string of aliases in the form:
Expand All @@ -505,6 +534,7 @@ def mk_alias_str(table_aliases: Dict[str, str]) -> str:
aliases_str += f"-- {table_name} AS {alias}\n"
return aliases_str


def generate_aliases(
table_names: List, reserved_keywords: List[str] = reserved_keywords
) -> str:
Expand Down

0 comments on commit 3e13fd6

Please sign in to comment.