Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Readme and improved support for DLT #45

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 364 additions & 4 deletions README.md

Large diffs are not rendered by default.

Binary file added docs/dqx.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dqx_lakehouse.png

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Should it not use the standard naming of bronze-silver-gold rather than raw-curated-final?

Copy link
Contributor Author

@mwojtyczka mwojtyczka Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, better to use our standard naming convention
corrected

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dqx_quarantine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
]
dependencies = ["databricks-labs-blueprint[yaml]~=0.4.4"]
dependencies = ["databricks-labs-blueprint[yaml]>=0.8,<0.9"]

[project.urls]
Issues = "https://github.com/databrickslabs/dqx/issues"
Expand Down Expand Up @@ -74,6 +74,9 @@ verify = ["black --check .",
[tool.isort]
profile = "black"

[tool.mypy]
exclude = ['venv', '.venv']

[tool.pytest.ini_options]
addopts = "--no-header"
cache_dir = ".venv/pytest-cache"
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/dqx/col_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def make_condition(condition: Column, message: Column | str, alias: str) -> Colu
return (F.when(condition, msg_col).otherwise(F.lit(None).cast("string"))).alias(_cleanup_alias_name(alias))


def _cleanup_alias_name(col_name: str):
def _cleanup_alias_name(col_name: str) -> str:
# avoid issues with structs
return col_name.replace(".", "_")

Expand Down
22 changes: 20 additions & 2 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,30 @@ def apply_checks_and_split(df: DataFrame, checks: list[DQRule]) -> tuple[DataFra

checked_df = apply_checks(df, checks)

good_df = checked_df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value)
bad_df = checked_df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull())
good_df = get_valid(checked_df)
bad_df = get_invalid(checked_df)

return good_df, bad_df


def get_invalid(df: DataFrame) -> DataFrame:
"""
Get invalid records only (errors and warnings).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document what does "valid" and "invalid" means!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@param df: input DataFrame
@return:
"""
return df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull())


def get_valid(df: DataFrame) -> DataFrame:
"""
Get valid records only (errors only)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document that you drop columns!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@param df: input DataFrame.
@return:
"""
return df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value)


def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]:
"""Build checks based on check specification, i.e. function name plus arguments.

Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/dqx/profiler/dlt_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re

from databricks.labs.dqx.profiler.common import val_to_str
from databricks.labs.dqx.profiler.profiler import DQProfile
from databricks.labs.dqx.profiler.engine import DQProfile

__name_sanitize_re__ = re.compile(r"[^a-zA-Z0-9]+")

Expand Down

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I'm not sure if it is a good idea to have two "engine" files in the same project, even if it is under a different dir. It is confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed back

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def do_cast(value: str | None, typ: T.DataType) -> Any | None:
def get_df_summary_as_dict(df: DataFrame) -> dict[str, Any]:
"""Generate summary for Dataframe & return it as dictionary with column name as a key, and dict of metric/value

:param df: dataframe to profile
:param df: dataframe to _profile
:return: dict with metrics per column
"""
sm_dict: dict[str, dict] = {}
Expand Down Expand Up @@ -233,7 +233,7 @@ def get_columns_or_fields(cols: list[T.StructField]) -> list[T.StructField]:
# TODO: split into managebale chunks
# TODO: how to handle maps, arrays & structs?
# TODO: return not only DQ rules, but also the profiling results - use named tuple?
def profile_dataframe(
def profile(
df: DataFrame, cols: list[str] | None = None, opts: dict[str, Any] | None = None
) -> tuple[dict[str, Any], list[DQProfile]]:
if opts is None:
Expand All @@ -254,12 +254,12 @@ def profile_dataframe(
max_nulls = opts.get("max_null_ratio", 0)
trim_strings = opts.get("trim_strings", True)

profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings)
_profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings)

return summary_stats, dq_rules


def profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings):
def _profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings):
# TODO: think, how we can do it in fewer passes. Maybe only for specific things, like, min_max, etc.
for field in get_columns_or_fields(df_cols):
field_name = field.name
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/dqx/profiler/generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from databricks.labs.dqx.profiler.common import val_maybe_to_str
from databricks.labs.dqx.profiler.profiler import DQProfile
from databricks.labs.dqx.profiler.engine import DQProfile


def dq_generate_is_in(col_name: str, level: str = "error", **params: dict):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_dlt_rules_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List

from databricks.labs.dqx.profiler.dlt_generator import generate_dlt_rules
from databricks.labs.dqx.profiler.profiler import DQProfile
from databricks.labs.dqx.profiler.engine import DQProfile

test_empty_rules: List[DQProfile] = []

Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from pyspark.sql import SparkSession

from databricks.labs.dqx.profiler.profiler import (
from databricks.labs.dqx.profiler.engine import (
DQProfile,
T,
get_columns_or_fields,
profile_dataframe,
profile,
)


Expand Down Expand Up @@ -82,7 +82,7 @@ def test_profiler(spark_session: SparkSession):
],
schema=inp_schema,
)
stats, rules = profile_dataframe(inp_df)
stats, rules = profile(inp_df)
# pprint.pprint(stats)
# pprint.pprint(rules)
expected_rules = [
Expand Down Expand Up @@ -114,6 +114,6 @@ def test_profiler(spark_session: SparkSession):
def test_profiler_empty_df(spark_session: SparkSession):
test_df = spark_session.createDataFrame([], "data: string")

actual_summary_stats, actual_dq_rule = profile_dataframe(test_df)
actual_summary_stats, actual_dq_rule = profile(test_df)

assert len(actual_dq_rule) == 0
2 changes: 1 addition & 1 deletion tests/unit/test_rules_generator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime

from databricks.labs.dqx.profiler.engine import DQProfile
from databricks.labs.dqx.profiler.generator import generate_dq_rules
from databricks.labs.dqx.profiler.profiler import DQProfile

test_rules = [
DQProfile(
Expand Down
Loading