Skip to content
This repository has been archived by the owner on May 17, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into 859
Browse files Browse the repository at this point in the history
  • Loading branch information
sar009 authored Feb 7, 2024
2 parents f4df144 + cac665b commit 3b0b7cc
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 62 deletions.
8 changes: 6 additions & 2 deletions data_diff/cloud/datafold_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,22 @@ class TSummaryResultSchemaStats(pydantic.BaseModel):
exclusive_columns: Tuple[List[str], List[str]]


class TSummaryResultDependencyDetails(pydantic.BaseModel):
deps: Dict[str, List[Dict]]


class TCloudApiDataDiffSummaryResult(pydantic.BaseModel):
status: str
pks: Optional[TSummaryResultPrimaryKeyStats]
values: Optional[TSummaryResultValueStats]
schema_: Optional[TSummaryResultSchemaStats]
dependencies: Optional[Dict[str, Any]]
deps: Optional[TSummaryResultDependencyDetails]

@classmethod
def from_orm(cls, obj: Any) -> Self:
pks = TSummaryResultPrimaryKeyStats(**obj["pks"]) if "pks" in obj else None
values = TSummaryResultValueStats(**obj["values"]) if "values" in obj else None
deps = obj["deps"] if "deps" in obj else None
deps = TSummaryResultDependencyDetails(**obj["dependencies"]) if "dependencies" in obj else None
schema = TSummaryResultSchemaStats(**obj["schema"]) if "schema" in obj else None
return cls(
status=obj["status"],
Expand Down
2 changes: 2 additions & 0 deletions data_diff/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ def query_table_schema(self, path: DbPath) -> Dict[str, RawColumnInfo]:
accessing the schema using a SQL query.
"""
rows = self.query(self.select_table_schema(path), list, log_message=path)

if not rows:
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")

Expand All @@ -1060,6 +1061,7 @@ def query_table_schema(self, path: DbPath) -> Dict[str, RawColumnInfo]:
)
for r in rows
}

assert len(d) == len(rows)
return d

Expand Down
6 changes: 5 additions & 1 deletion data_diff/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,16 @@ def select_table_schema(self, path: DbPath) -> str:
database, schema, table = self._normalize_table_path(path)

info_schema_path = ["information_schema", "columns"]

if database:
info_schema_path.insert(0, database)
dynamic_database_clause = f"'{database}'"
else:
dynamic_database_clause = "current_catalog()"

return (
f"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale FROM {'.'.join(info_schema_path)} "
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
f"WHERE table_name = '{table}' AND table_schema = '{schema}' and table_catalog = {dynamic_database_clause}"
)

def _normalize_table_path(self, path: DbPath) -> DbPath:
Expand Down
77 changes: 57 additions & 20 deletions data_diff/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,23 @@ def _local_diff(
k for k, v in table2_columns.items() if k in table1_columns and v.data_type != table1_columns[k].data_type
}

if columns_added:
diff_output_str += columns_added_template(columns_added)
diff_output_str += f"Primary Keys: {diff_vars.primary_keys} \n"

if diff_vars.where_filter:
diff_output_str += f"Where Filter: '{str(diff_vars.where_filter)}' \n"

if diff_vars.include_columns:
diff_output_str += f"Included Columns: {diff_vars.include_columns} \n"

if diff_vars.exclude_columns:
diff_output_str += f"Excluded Columns: {diff_vars.exclude_columns} \n"

if columns_removed:
diff_output_str += columns_removed_template(columns_removed)

if columns_added:
diff_output_str += columns_added_template(columns_added)

if columns_type_changed:
diff_output_str += columns_type_changed_template(columns_type_changed)
column_set = column_set.difference(columns_type_changed)
Expand Down Expand Up @@ -330,13 +341,14 @@ def _local_diff(
return

dataset1_columns = [
(name, type_, table1.database.dialect.parse_type(table1.table_path, name, type_, *other))
for (name, type_, *other) in table1_columns.values()
(info.column_name, info.data_type, table1.database.dialect.parse_type(table1.table_path, info))
for info in table1_columns.values()
]
dataset2_columns = [
(name, type_, table2.database.dialect.parse_type(table2.table_path, name, type_, *other))
for (name, type_, *other) in table2_columns.values()
(info.column_name, info.data_type, table2.database.dialect.parse_type(table2.table_path, info))
for info in table2_columns.values()
]

print(
json.dumps(
jsonify(
Expand Down Expand Up @@ -436,32 +448,57 @@ def _cloud_diff(
rows_removed_count = diff_results.pks.exclusives[0]

rows_updated = diff_results.values.rows_with_differences
total_rows = diff_results.values.total_rows
rows_unchanged = int(total_rows) - int(rows_updated)
total_rows_table1 = diff_results.pks.total_rows[0]
total_rows_table2 = diff_results.pks.total_rows[1]
total_rows_diff = total_rows_table2 - total_rows_table1

rows_unchanged = int(total_rows_table1) - int(rows_updated) - int(rows_removed_count)
diff_percent_list = {
x.column_name: str(x.match) + "%" for x in diff_results.values.columns_diff_stats if x.match != 100.0
x.column_name: f"{str(round(100.00 - x.match, 2))}%"
for x in diff_results.values.columns_diff_stats
if x.match != 100.0
}
columns_added = diff_results.schema_.exclusive_columns[1]
columns_removed = diff_results.schema_.exclusive_columns[0]
columns_added = set(diff_results.schema_.exclusive_columns[1])
columns_removed = set(diff_results.schema_.exclusive_columns[0])
column_type_changes = diff_results.schema_.column_type_differs

if columns_added:
diff_output_str += columns_added_template(columns_added)
diff_output_str += f"Primary Keys: {diff_vars.primary_keys} \n"
if diff_vars.where_filter:
diff_output_str += f"Where Filter: '{str(diff_vars.where_filter)}' \n"

if diff_vars.include_columns:
diff_output_str += f"Included Columns: {diff_vars.include_columns} \n"

if diff_vars.exclude_columns:
diff_output_str += f"Excluded Columns: {diff_vars.exclude_columns} \n"

if columns_removed:
diff_output_str += columns_removed_template(columns_removed)

if columns_added:
diff_output_str += columns_added_template(columns_added)

if column_type_changes:
diff_output_str += columns_type_changed_template(column_type_changes)

deps_impacts = {
key: len(value) + sum(len(item.get("BiHtSync", [])) for item in value) if key == "hightouch" else len(value)
for key, value in diff_results.deps.deps.items()
}

if any([rows_added_count, rows_removed_count, rows_updated]):
diff_output = dbt_diff_string_template(
rows_added_count,
rows_removed_count,
rows_updated,
str(rows_unchanged),
diff_percent_list,
"Value Match Percent:",
total_rows_table1=total_rows_table1,
total_rows_table2=total_rows_table2,
total_rows_diff=total_rows_diff,
rows_added=rows_added_count,
rows_removed=rows_removed_count,
rows_updated=rows_updated,
rows_unchanged=str(rows_unchanged),
deps_impacts=deps_impacts,
is_cloud=True,
extra_info_dict=diff_percent_list,
extra_info_str="Value Changed:",
)
diff_output_str += f"\n{diff_url}\n {diff_output} \n"
rich.print(diff_output_str)
Expand Down Expand Up @@ -505,7 +542,7 @@ def _cloud_diff(


def _diff_output_base(dev_path: str, prod_path: str) -> str:
return f"\n[green]{prod_path} <> {dev_path}[/] \n"
return f"\n[blue]{prod_path}[/] <> [green]{dev_path}[/] \n"


def _initialize_events(dbt_user_id: Optional[str], dbt_version: Optional[str], dbt_project_id: Optional[str]) -> None:
Expand Down
17 changes: 11 additions & 6 deletions data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,19 @@ def _get_stats(self, is_dbt: bool = False) -> DiffStats:
def get_stats_string(self, is_dbt: bool = False):
diff_stats = self._get_stats(is_dbt)

total_rows_diff = diff_stats.table2_count - diff_stats.table1_count

if is_dbt:
string_output = dbt_diff_string_template(
diff_stats.diff_by_sign["+"],
diff_stats.diff_by_sign["-"],
diff_stats.diff_by_sign["!"],
diff_stats.unchanged,
diff_stats.extra_column_diffs,
"Values Updated:",
total_rows_table1=diff_stats.table1_count,
total_rows_table2=diff_stats.table2_count,
total_rows_diff=total_rows_diff,
rows_added=diff_stats.diff_by_sign["+"],
rows_removed=diff_stats.diff_by_sign["-"],
rows_updated=diff_stats.diff_by_sign["!"],
rows_unchanged=diff_stats.unchanged,
extra_info_dict=diff_stats.extra_column_diffs,
extra_info_str="[u]Values Changed[/u]",
)

else:
Expand Down
66 changes: 53 additions & 13 deletions data_diff/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,19 +459,59 @@ def __repr__(self) -> str:


def dbt_diff_string_template(
rows_added: str, rows_removed: str, rows_updated: str, rows_unchanged: str, extra_info_dict: Dict, extra_info_str
total_rows_table1: int,
total_rows_table2: int,
total_rows_diff: int,
rows_added: int,
rows_removed: int,
rows_updated: int,
rows_unchanged: int,
extra_info_dict: Dict,
extra_info_str: str,
is_cloud: Optional[bool] = False,
deps_impacts: Optional[Dict] = None,
) -> str:
string_output = f"\n{tabulate([[rows_added, rows_removed]], headers=['Rows Added', 'Rows Removed'])}"
# main table
main_rows = [
["Total", total_rows_table1, "", f"{total_rows_table2} [{diff_int_dynamic_color_template(total_rows_diff)}]"],
["Added", "", diff_int_dynamic_color_template(rows_added), ""],
["Removed", "", diff_int_dynamic_color_template(-rows_removed), ""],
["Different", "", rows_updated, ""],
["Unchanged", "", rows_unchanged, ""],
]

main_headers = ["rows", "PROD", "<>", "DEV"]
main_table = tabulate(main_rows, headers=main_headers)

# diffs table
diffs_rows = sorted(list(extra_info_dict.items()))

diffs_headers = ["columns", "% diff values" if is_cloud else "# diff values"]
diffs_table = tabulate(diffs_rows, headers=diffs_headers)

# deps impacts table
deps_impacts_table = ""
if deps_impacts:
deps_impacts_rows = list(deps_impacts.items())
deps_impacts_headers = ["deps", "# data assets"]
deps_impacts_table = f"\n\n{tabulate(deps_impacts_rows, headers=deps_impacts_headers)}"

# combine all tables
string_output = f"\n{main_table}\n\n{diffs_table}{deps_impacts_table}"

string_output += f"\n\nUpdated Rows: {rows_updated}\n"
string_output += f"Unchanged Rows: {rows_unchanged}\n\n"
return string_output

string_output += extra_info_str

for k, v in extra_info_dict.items():
string_output += f"\n{k}: {v}"
def diff_int_dynamic_color_template(diff_value: int) -> str:
if not isinstance(diff_value, int):
return diff_value

return string_output
if diff_value > 0:
return f"[green]+{diff_value}[/]"
elif diff_value < 0:
return f"[red]{diff_value}[/]"
else:
return "0"


def _jsons_equiv(a: str, b: str):
Expand All @@ -498,18 +538,18 @@ def diffs_are_equiv_jsons(diff: list, json_cols: dict):
return match, overriden_diff_cols


def columns_removed_template(columns_removed) -> str:
columns_removed_str = f"Column(s) removed: {columns_removed}\n"
def columns_removed_template(columns_removed: set) -> str:
columns_removed_str = f"[red]Columns removed [-{len(columns_removed)}]:[/] [blue]{columns_removed}[/]\n"
return columns_removed_str


def columns_added_template(columns_added) -> str:
columns_added_str = f"Column(s) added: {columns_added}\n"
def columns_added_template(columns_added: set) -> str:
columns_added_str = f"[green]Columns added [+{len(columns_added)}]: {columns_added}[/]\n"
return columns_added_str


def columns_type_changed_template(columns_type_changed) -> str:
columns_type_changed_str = f"Type change: {columns_type_changed}\n"
columns_type_changed_str = f"Type changed [{len(columns_type_changed)}]: [green]{columns_type_changed}[/]\n"
return columns_type_changed_str


Expand Down
2 changes: 1 addition & 1 deletion data_diff/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.10.1"
__version__ = "0.11.0"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "data-diff"
version = "0.10.1"
version = "0.11.0"
description = "Command-line tool and Python library to efficiently diff rows across two different databases."
authors = ["Datafold <[email protected]>"]
license = "MIT"
Expand Down
Binary file modified tests/dbt_artifacts/jaffle_shop.duckdb
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/dbt_artifacts/target/manifest.json

Large diffs are not rendered by default.

Loading

0 comments on commit 3b0b7cc

Please sign in to comment.