diff --git a/README.md b/README.md index c699414..1ffa915 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,416 @@ Databricks Labs DQX === -[![codecov](https://codecov.io/github/databrickslabs/dqx/graph/badge.svg?token=x1JSVddfZa)](https://codecov.io/github/databrickslabs/dqx) [![lines of code](https://tokei.rs/b1/github/databrickslabs/dqx)]([https://codecov.io/github/databrickslabs/dqx](https://github.com/databrickslabs/dqx)) +Simplified Data Quality checking at Scale for PySpark Workloads on streaming and standard DataFrames. +# Motivation -... +Current data quality frameworks often fall short in providing detailed explanations for specific row or column +data quality issues and are primarily designed for complete datasets, +making integration into streaming workloads difficult. + +This project introduces a simple Python validation framework for assessing data quality of PySpark DataFrames. +It enables real-time quality validation during data processing rather than relying solely on post-factum monitoring. +The validation output includes detailed information on why specific rows and columns have issues, +allowing for quicker identification and resolution of data quality problems. + +![problem](docs/dqx.png?) + +Invalid data can be quarantined to make sure bad data is never written to the output. + +![problem](docs/dqx_quarantine.png?) + +In the Lakehouse architecture, the validation of new data should happen at the time of data entry into the Curated Layer +to make sure bad data is not propagated to the subsequent layers. With DQX you can easily quarantine invalid data and re-ingest it +after curation to ensure that data quality constraints are met. + +![problem](docs/dqx_lakehouse.png?) + +For monitoring the data quality of already persisted data in a Delta table (post-factum monitoring), we recommend to use +[Databricks Lakehouse Monitoring](https://docs.databricks.com/en/lakehouse-monitoring/index.html). +For Spark workloads written in Scala, consider the [DataFrame Rules Engine](https://github.com/databrickslabs/dataframe-rules-engine). + +[![build](https://github.com/databrickslabs/dqx/actions/workflows/push.yml/badge.svg)](https://github.com/databrickslabs/dqx/actions/workflows/push.yml) [![codecov](https://codecov.io/github/databrickslabs/dqx/graph/badge.svg?token=x1JSVddfZa)](https://codecov.io/github/databrickslabs/dqx) ![linesofcode](https://aschey.tech/tokei/github/databrickslabs/dqx?category=code) * [Databricks Labs DQX](#databricks-labs-dqx) +* [Motivation](#motivation) +* [Key Capabilities](#key-capabilities) +* [Prerequisites](#prerequisites) * [Installation](#installation) +* [How to use it](#how-to-use-it) +* [Quality rules / functions](#quality-rules--functions) +* [Contribution](#contribution) * [Project Support](#project-support) +# Key Capabilities + +- Info of why a check has failed. +- Data format agnostic (works on Spark DataFrames). +- Support for Spark Batch and Streaming including DLT (Delta Live Tables). +- Different reactions on failed checks, e.g. drop, mark, or quarantine invalid data. +- Support for check levels: warning (mark) or errors (mark and don't propagate the rows). +- Support for quality rules at row and column level. +- Profiling and generation of data quality rules candidates. +- Re-ingestion of curated data. +- Checks definition as code or config. +- Validation summary and data quality dashboard for identifying and tracking data quality issues. + +# Prerequisites + +- Python 3.10 or later. See [instructions](https://www.python.org/downloads/). +- Unity Catalog-enabled [Databricks workspace](https://docs.databricks.com/en/getting-started/index.html). +- (Optional) Databricks CLI v0.213 or later. See [instructions](https://docs.databricks.com/dev-tools/cli/databricks-cli.html). +- (Optional) Network access to your Databricks Workspace used for the [installation process](#installation). + +[[back to top](#databricks-labs-dqx)] + # Installation -You can install this project via `pip`: +The project can be installed on a Databricks workspace or used as a standalone library. -``` +## Installation as Library + +Install the project via `pip`: + +```commandline pip install databricks-labs-dqx ``` +## Installation in a Databricks Workspace + +### Authentication + +Once you install Databricks CLI, authenticate your current machine to your Databricks Workspace: + +```commandline +databricks auth login --host +``` + +To enable debug logs, simply add `--debug` flag to any command. +More about authentication options [here](https://docs.databricks.com/en/dev-tools/cli/authentication.html). + +### Installation + +Install DQX in your Databricks workspace via Databricks CLI: + +```commandline +databricks labs install dqx +``` + +You'll be prompted to select a [configuration profile](https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication) created by `databricks auth login` command, +and other configuration options. + +The cli command will install the following components in the workspace: +- A Python [wheel file](https://peps.python.org/pep-0427/) with the library packaged +- DQX configuration file ('config.yaml') +- Quality dashboard for monitoring + +By default, DQX is installed in the user home directory (under '/Users//.dqx'). You can also install DQX globally +by setting 'DQX_FORCE_INSTALL' environment variable. The following options are available: +* `DQX_FORCE_INSTALL=global databricks labs install dqx`: will force the installation to be for root only ('/Applications/dqx') +* `DQX_FORCE_INSTALL=user databricks labs install dqx`: will force the installation to be for user only ('/Users//.dqx') + +### Upgrade + +Verify that DQX is installed: + +```commandline +databricks labs installed +``` + +Upgrade DQX via Databricks CLI: + +```commandline +databricks labs upgrade dqx +``` + +### Uninstall + +Uninstall DQX via Databricks CLI: + +```commandline +databricks labs uninstall dqx +``` + +Databricks CLI will confirm a few options: +- Whether you want to remove all dqx artefacts from the workspace as well. Defaults to 'no'. + +[[back to top](#databricks-labs-dqx)] + +# How to use it + +## Data Profiling + +Data profiling is run to generate quality rule candidates and provide summary statistics of the input data. +The generated rules are input for the quality checking (see [Adding quality checks to the application](#adding-quality-checks-to-the-application)). + +### In Python + +```python +from databricks.labs.dqx.profiler.profiler import profile + +df = spark.read.table("catalog1.schema1.table1") +summary_stats, checks = profile(df) +``` + +### Using CLI + +You must install DQX in the workspace before (see [installation](#installation-in-a-databricks-workspace)). + +Run profiling job: +```commandline +databricks labs dqx profile +``` + +The following DQX configuration from 'config.yaml' will be used by default: +- 'input_location': input data as a path or a table. +- 'checks_file': location of the generated quality rule candidates (default: `checks.json`). +- 'profile_summary_stats_file': location of the summary statistics (default: `profile_summary.json`). + +You can overwrite the default parameters as follows: +```commandline +databricks labs dqx profile --input-location "catalog1.schema1.table1" --checks-file "checks.json" --profile-summary-stats-file "profile_summary.json" +``` + +## Validating quality rules (checks) + +If you manually adjust the generated rules or create your own configuration, you can validate them before using in your application +(see [Adding quality checks to the application](#adding-quality-checks-to-the-application)). + +### In Python + +```python +from databricks.labs.dqx.engine import validate_rules + +valid = validate_rules(checks) # returns boolean +``` + +### Using CLI + +Validate checks stored in the installation folder: +```commandline +databricks labs dqx validate +``` +The following DQX configuration from 'config.yaml' will be used by default: +- 'checks_file': location of the quality rule (default: `checks.json`). + +You can overwrite the default parameters as follows: +```commandline +databricks labs dqx validate --checks-file "checks.json" +``` + +## Adding quality checks to the application + +### Quality rules defined as config + +Quality rules ('checks.json'): +```json +[ + {"criticality":"error","check":{"function":"is_not_null","arguments":{"col_names":["col1","col2"]}}}, + {"name":"col_col3_is_null_or_empty","criticality":"error","check":{"function":"is_not_null_and_not_empty","arguments":{"col_name":"col3"}}}, + {"criticality":"warn","check":{"function":"value_is_in_list","arguments":{"col_name":"col4","allowed":[1,2]}}} +] +``` +Fields: +- `criticality`: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both dataframes). +- `check`: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name as str to apply to check for) or "col_names" (column names as array to apply the check for). +- (optional) `name` for the check: autogenerated if not provided. + +**In the application:** +```python +from databricks.labs.dqx.engine import apply_checks_by_metadata, apply_checks_by_metadata_and_split +from databricks.labs.dqx.engine import load_from_file + +# use 'checks' config from the 'config.yaml', or provide the filename directly +checks = load_from_file() + +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks) + +# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) +valid_and_quarantined_df = apply_checks_by_metadata(input_df, checks) +``` + +### Quality rules defined as code + +**Method 1 (using json-like dictionary):** +```python +from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list +from databricks.labs.dqx.engine import apply_checks_by_metadata, apply_checks_by_metadata_and_split + +checks = [ + { + "check": is_not_null("col1"), + "name": "col1_is_null", + "criticality": "error" + },{ + "check": is_not_null("col2"), + "name": "col2_is_null", + "criticality": "error" + },{ + "check": is_not_null_and_not_empty("col3"), + "name": "col3_is_null_or_empty", + "criticality": "error" + },{ + # name auto-generated if not provided + "check": value_is_in_list('col4', ['1', '2']), + "criticality": "warn" + }, +] + +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +valid_df, quarantined_df = apply_checks_by_metadata_and_split(input_df, checks) + +# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) +valid_and_quarantined_df = apply_checks_by_metadata(input_df, checks) +``` + +See details of the check functions [here](#quality-rules--functions). + +**Method 2 (using DQX classes):** +```python +from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list +from databricks.labs.dqx.engine import DQRule, DQRuleColSet, apply_checks, apply_checks_and_split + +checks = DQRuleColSet( # define rule for multiple columns at once + columns=["col1", "col2"], + criticality="error", + check_func=is_not_null).get_rules() + [ + DQRule( # define rule for a single column + name='col3_is_null_or_empty', + criticality='error', + check=is_not_null_and_not_empty('col3')), + DQRule( # name auto-generated if not provided + criticality='warn', + check=value_is_in_list('col4', ['1', '2'])) + ] + +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +valid_df, quarantined_df = apply_checks_and_split(input_df, checks) + +# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) +valid_and_quarantined_df = apply_checks(input_df, checks) +``` + +See details of the check functions [here](#quality-rules--functions). + +### Integration with DLT (Delta Live Tables) + +DLT provides [expectations](https://docs.databricks.com/en/delta-live-tables/expectations.html) to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. +The example below demonstrates how to integrate DQX checks with DLT to provide comprehensive information on why quality checks failed. +The integration does not use expectations but the DQX checks directly. + +```python +import dlt +from databricks.labs.dqx.engine import apply_checks, get_invalid, get_valid + +checks = ... # quality rules / checks + +@dlt.view +def bronze_dq_check(): + df = dlt.read_stream("bronze") + return apply_checks(df, checks) + +@dlt.table +def silver(): + df = dlt.read_stream("bronze_dq_check") + # get rows without errors or warnings, and drop auxiliary columns + return get_valid(df) + +@dlt.table +def quarantine(): + df = dlt.read_stream("bronze_dq_check") + # get only rows with errors or warnings + return get_invalid(df) +``` + +## Re-ingesting curated data + +Once the application is run, the user should analyse and investigate data quality problems, +for example by looking at a summary, quality dashboard, and/or quarantined dataset etc.). + +Once the root cause analysis is performed, user can curate the quarantined data, +and then re-ingest it to the output by running: +```commandline +databricks labs dqx re-ingest-curated +``` + +The following DQX configuration from 'config.yaml' will be used by default: +- 'curated_location' - location of the curated data as a path or table. +- 'output_location' - location of the output data as a path or table. + +You can overwrite the default parameters as follows: +```commandline +databricks labs dqx profile --output-location "catalog1.schema1.table1" --curated-location "catalog1.schema1.table2" +``` + +[[back to top](#databricks-labs-dqx)] + +# Quality rules / functions + +The following quality rules / functions are currently available: + +| Check | Description | Arguments | +|------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| is_not_null | Check if input column is not null | col_name: column name to check | +| is_not_empty | Check if input column is not empty | col_name: column name to check | +| is_not_null_and_not_empty | Check if input column is not null or empty | col_name: column name to check; trim_strings: boolean flag to trim spaces from strings | +| value_is_in_list | Check if the provided value is present in the input column. | col_name: column name to check; allowed: list of allowed values | +| value_is_not_null_and_is_in_list | Check if provided value is present if the input column is not null | col_name: column name to check; allowed: list of allowed values | +| is_in_range | Check if input column is in the provided range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit; max_limit: max limit | +| is_not_in_range | Check if input column is not within defined range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit value; max_limit: max limit value | +| not_less_than | Check if input column is not less than the provided limit | col_name: column name to check; limit: limit value | +| not_greater_than | Check if input column is not greater than the provided limit | col_name: column name to check; limit: limit value | +| not_in_future | Check if input column defined as date is not in the future (future defined as current_timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used | +| not_in_near_future | Check if input column defined as date is not in the near future (near future defined as grater than current timestamp but less than current timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used | +| is_older_than_n_days | Check if input column is older than n number of days | col_name: column name to check; days: number of days; curr_date: current date, if not provided current_date() function is used | +| is_older_than_col2_for_n_days | Check if one column is not older than another column by n number of days | col_name1: first column name to check; col_name2: second column name to check; days: number of days | +| regex_match | Check if input column matches a given regex | col_name: column name to check; regex: regex to check; negate: if the condition should be negated (true) or not | +| sql_expression | Check if input column is matches the provided sql expression, eg. a = 'str1' | expression: sql expression to check; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negated | + +You can check implementation details of the rules [here](src/databricks/labs/dqx/col_functions.py). + +## Creating your own checks + +If a check that you need does not exist in DQX, you may be able to define it using sql expression rule (`sql_expression`), +for example: +```json +[ + {"criticality":"error","check":{"function":"sql_expression","arguments":{"expression":"a LIKE %foo", "msg": "a ends with foo"}}} +] +``` + +Alternatively, you can define your own checks. A check is a function available from 'globals' that returns `pyspark.sql.Column`, for example: + +```python +import pyspark.sql.functions as F +from pyspark.sql import Column +from databricks.labs.dqx.col_functions import make_condition + +def ends_with_foo(col_name: str) -> Column: + column = F.col(col_name) + return make_condition(~(column.endswith("foo")), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo") +``` + +Then define the check, for example: +```json +[ + {"criticality":"error","check":{"function":"ends_with_foo","arguments":{"col_name":"a"}}} +] +``` + +You can see all existing DQX checks [here](src/databricks/labs/dqx/col_functions.py). + +Feel free to submit a PR to DQX with your own check so that other can benefit from it (see [contribution guide](#contribution)). + +[[back to top](#databricks-labs-dqx)] + +# Contribution + +See contribution guidance [here](CONTRIBUTING.md). + [[back to top](#databricks-labs-dqx)] # Project Support diff --git a/docs/dqx.png b/docs/dqx.png new file mode 100644 index 0000000..db1c17b Binary files /dev/null and b/docs/dqx.png differ diff --git a/docs/dqx_lakehouse.png b/docs/dqx_lakehouse.png new file mode 100644 index 0000000..a8340ce Binary files /dev/null and b/docs/dqx_lakehouse.png differ diff --git a/docs/dqx_quarantine.png b/docs/dqx_quarantine.png new file mode 100644 index 0000000..c4f8a20 Binary files /dev/null and b/docs/dqx_quarantine.png differ diff --git a/src/databricks/labs/dqx/col_functions.py b/src/databricks/labs/dqx/col_functions.py index 62075c4..f1f24f6 100644 --- a/src/databricks/labs/dqx/col_functions.py +++ b/src/databricks/labs/dqx/col_functions.py @@ -21,7 +21,7 @@ def make_condition(condition: Column, message: Column | str, alias: str) -> Colu return (F.when(condition, msg_col).otherwise(F.lit(None).cast("string"))).alias(_cleanup_alias_name(alias)) -def _cleanup_alias_name(col_name: str): +def _cleanup_alias_name(col_name: str) -> str: # avoid issues with structs return col_name.replace(".", "_") @@ -116,7 +116,7 @@ def sql_expression(expression: str, msg: str | None = None, name: str | None = N :param expression: SQL expression :param msg: optional message of the `Column` type, automatically generated if None - :param name: optional name of the column, automatically generated if None + :param name: optional name of the resulting column, automatically generated if None :param negate: if the condition should be negated (true) or not. For example, "col is not null" will mark null values as "bad". Although sometimes it's easier to specify it other way around "col is null" + negate set to False :return: new Column diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index ec770b1..131e18f 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -169,12 +169,30 @@ def apply_checks_and_split(df: DataFrame, checks: list[DQRule]) -> tuple[DataFra checked_df = apply_checks(df, checks) - good_df = checked_df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value) - bad_df = checked_df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull()) + good_df = get_valid(checked_df) + bad_df = get_invalid(checked_df) return good_df, bad_df +def get_invalid(df: DataFrame) -> DataFrame: + """ + Get records that violate data quality checks. + @param df: input DataFrame. + @return: dataframe with error and warning rows and corresponding reporting columns. + """ + return df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull()) + + +def get_valid(df: DataFrame) -> DataFrame: + """ + Get records that don't violate data quality checks. + @param df: input DataFrame. + @return: dataframe with warning rows but no reporting columns. + """ + return df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value) + + def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]: """Build checks based on check specification, i.e. function name plus arguments. diff --git a/src/databricks/labs/dqx/profiler/profiler.py b/src/databricks/labs/dqx/profiler/profiler.py index 83b74b9..26a1537 100644 --- a/src/databricks/labs/dqx/profiler/profiler.py +++ b/src/databricks/labs/dqx/profiler/profiler.py @@ -32,7 +32,7 @@ def do_cast(value: str | None, typ: T.DataType) -> Any | None: def get_df_summary_as_dict(df: DataFrame) -> dict[str, Any]: """Generate summary for Dataframe & return it as dictionary with column name as a key, and dict of metric/value - :param df: dataframe to profile + :param df: dataframe to _profile :return: dict with metrics per column """ sm_dict: dict[str, dict] = {} @@ -233,7 +233,7 @@ def get_columns_or_fields(cols: list[T.StructField]) -> list[T.StructField]: # TODO: split into managebale chunks # TODO: how to handle maps, arrays & structs? # TODO: return not only DQ rules, but also the profiling results - use named tuple? -def profile_dataframe( +def profile( df: DataFrame, cols: list[str] | None = None, opts: dict[str, Any] | None = None ) -> tuple[dict[str, Any], list[DQProfile]]: if opts is None: @@ -254,12 +254,12 @@ def profile_dataframe( max_nulls = opts.get("max_null_ratio", 0) trim_strings = opts.get("trim_strings", True) - profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings) + _profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings) return summary_stats, dq_rules -def profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings): +def _profile(df, df_cols, dq_rules, max_nulls, opts, summary_stats, total_count, trim_strings): # TODO: think, how we can do it in fewer passes. Maybe only for specific things, like, min_max, etc. for field in get_columns_or_fields(df_cols): field_name = field.name diff --git a/tests/unit/test_profiler.py b/tests/unit/test_profiler.py index 139751f..a40c240 100644 --- a/tests/unit/test_profiler.py +++ b/tests/unit/test_profiler.py @@ -6,7 +6,7 @@ DQProfile, T, get_columns_or_fields, - profile_dataframe, + profile, ) @@ -82,7 +82,8 @@ def test_profiler(spark_session: SparkSession): ], schema=inp_schema, ) - stats, rules = profile_dataframe(inp_df) + + stats, rules = profile(inp_df) expected_rules = [ DQProfile(name="is_not_null", column="t1", description=None, parameters=None), @@ -114,7 +115,7 @@ def test_profiler(spark_session: SparkSession): def test_profiler_empty_df(spark_session: SparkSession): test_df = spark_session.createDataFrame([], "data: string") - stats, rules = profile_dataframe(test_df) + actual_summary_stats, actual_dq_rules = profile(test_df) - assert len(stats.keys()) > 0 - assert len(rules) == 0 + assert len(actual_summary_stats.keys()) > 0 + assert len(actual_dq_rules) == 0